diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java index 44efadd6d899..510f00690a3c 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java @@ -20,12 +20,13 @@ package org.apache.iceberg.actions; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteFiles; @@ -37,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.RewriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; @@ -196,7 +198,7 @@ public BaseRewriteDataFilesAction filter(Expression expr) { @Override public RewriteDataFilesActionResult execute() { - CloseableIterable fileScanTasks = null; + CloseableIterable fileScanTasks = CloseableIterable.empty(); try { fileScanTasks = table.newScan() .caseSensitive(caseSensitive) @@ -215,13 +217,14 @@ public RewriteDataFilesActionResult execute() { Map> groupedTasks = groupTasksByPartition(fileScanTasks.iterator()); Map> filteredGroupedTasks = groupedTasks.entrySet().stream() - .filter(kv -> kv.getValue().size() > 1) + .filter(partitionTasks -> doPartitionNeedRewrite(partitionTasks.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - // Nothing to rewrite if there's only one DataFile in each partition. + // Nothing to rewrite if there's only one file in each partition. if (filteredGroupedTasks.isEmpty()) { return RewriteDataFilesActionResult.empty(); } + // Split and combine tasks under each partition List combinedScanTasks = filteredGroupedTasks.values().stream() .map(scanTasks -> { @@ -230,22 +233,26 @@ public RewriteDataFilesActionResult execute() { return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost); }) .flatMap(Streams::stream) - .filter(task -> task.files().size() > 1 || isPartialFileScan(task)) + .filter(this::doTaskNeedRewrite) .collect(Collectors.toList()); if (combinedScanTasks.isEmpty()) { return RewriteDataFilesActionResult.empty(); } - List addedDataFiles = rewriteDataForTasks(combinedScanTasks); - List currentDataFiles = combinedScanTasks.stream() - .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file)) - .collect(Collectors.toList()); - replaceDataFiles(currentDataFiles, addedDataFiles); + // Execute the real rewrite tasks in parallelism. + RewriteResult rewriteResult = rewriteDataForTasks(combinedScanTasks); - return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles); - } + // Commit the RewriteFiles transaction to iceberg table. + replaceDataFiles(rewriteResult); + return new RewriteDataFilesActionResult( + Lists.newArrayList(rewriteResult.dataFilesToDelete()), + Lists.newArrayList(rewriteResult.deleteFilesToDelete()), + Lists.newArrayList(rewriteResult.dataFilesToAdd()), + Lists.newArrayList(rewriteResult.deleteFilesToAdd()) + ); + } private Map> groupTasksByPartition( CloseableIterator tasksIter) { @@ -262,31 +269,64 @@ private Map> groupTasksByPartition( return tasksGroupedByPartition.asMap(); } - private void replaceDataFiles(Iterable deletedDataFiles, Iterable addedDataFiles) { + private void replaceDataFiles(RewriteResult result) { try { RewriteFiles rewriteFiles = table.newRewrite(); - rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles)); + + rewriteFiles.rewriteFiles( + Sets.newHashSet(result.dataFilesToDelete()), + Sets.newHashSet(result.deleteFilesToDelete()), + Sets.newHashSet(result.dataFilesToAdd()), + Sets.newHashSet(result.deleteFilesToAdd()) + ); + commit(rewriteFiles); } catch (Exception e) { - Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) + // Remove all the newly produced files if possible. + Iterable> addedFiles = Iterables.concat( + Arrays.asList(result.dataFilesToAdd()), + Arrays.asList(result.deleteFilesToAdd()) + ); + + Tasks.foreach(Iterables.transform(addedFiles, f -> f.path().toString())) .noRetry() .suppressFailureWhenFinished() .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) .run(fileIO::deleteFile); + throw e; } } - private boolean isPartialFileScan(CombinedScanTask task) { + private boolean doPartitionNeedRewrite(Collection partitionTasks) { + int files = 0; + for (FileScanTask scanTask : partitionTasks) { + files += 1; // One for data file. + files += scanTask.deletes().size(); + } + return files > 1; + } + + private boolean doTaskNeedRewrite(CombinedScanTask task) { + Preconditions.checkArgument(task != null && task.files().size() > 0, + "Files in CombinedScanTask could not be null or empty"); if (task.files().size() == 1) { - FileScanTask fileScanTask = task.files().iterator().next(); - return fileScanTask.file().fileSizeInBytes() != fileScanTask.length(); + FileScanTask scanTask = task.files().iterator().next(); + if (scanTask.deletes().size() > 0) { + // There are 1 data file and several delete files, we need to rewrite them into one data file. + return true; + } else { + // There is only 1 data file. If the rewrite data bytes happens to be a complete data file, then we don't + // need to do the real rewrite action. + return scanTask.file().fileSizeInBytes() != scanTask.length(); + } } else { - return false; + // There are multiple FileScanTask. + return true; } } protected abstract FileIO fileIO(); - protected abstract List rewriteDataForTasks(List combinedScanTask); + protected abstract RewriteResult rewriteDataForTasks(List combinedScanTask); } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java index 7313d9cb0418..86a8238154ca 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public class RewriteDataFilesActionResult { @@ -29,11 +30,21 @@ public class RewriteDataFilesActionResult { new RewriteDataFilesActionResult(ImmutableList.of(), ImmutableList.of()); private List deletedDataFiles; + private List deletedDeleteFiles; + private List addedDataFiles; + private List addedDeleteFiles; public RewriteDataFilesActionResult(List deletedDataFiles, List addedDataFiles) { + this(deletedDataFiles, ImmutableList.of(), addedDataFiles, ImmutableList.of()); + } + + public RewriteDataFilesActionResult(List deletedDataFiles, List deletedDeleteFiles, + List addedDataFiles, List addedDeleteFiles) { this.deletedDataFiles = deletedDataFiles; + this.deletedDeleteFiles = deletedDeleteFiles; this.addedDataFiles = addedDataFiles; + this.addedDeleteFiles = addedDeleteFiles; } static RewriteDataFilesActionResult empty() { @@ -44,7 +55,15 @@ public List deletedDataFiles() { return deletedDataFiles; } + public List deletedDeleteFiles() { + return deletedDeleteFiles; + } + public List addedDataFiles() { return addedDataFiles; } + + public List addedDeleteFiles() { + return addedDeleteFiles; + } } diff --git a/core/src/main/java/org/apache/iceberg/io/RewriteResult.java b/core/src/main/java/org/apache/iceberg/io/RewriteResult.java new file mode 100644 index 000000000000..b62051fa7bb8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RewriteResult.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +public class RewriteResult implements Serializable { + // Files to delete. + private final DataFile[] dataFilesToDelete; + private final DeleteFile[] deleteFilesToDelete; + + // Files to add. + private final DataFile[] dataFilesToAdd; + private final DeleteFile[] deleteFilesToAdd; + + private RewriteResult(Set dataFilesToDelete, + Set deleteFilesToDelete, + Set dataFilesToAdd, + Set deleteFilesToAdd) { + this.dataFilesToDelete = dataFilesToDelete.toArray(new DataFile[0]); + this.deleteFilesToDelete = deleteFilesToDelete.toArray(new DeleteFile[0]); + this.dataFilesToAdd = dataFilesToAdd.toArray(new DataFile[0]); + this.deleteFilesToAdd = deleteFilesToAdd.toArray(new DeleteFile[0]); + } + + public DataFile[] dataFilesToDelete() { + return dataFilesToDelete; + } + + public DeleteFile[] deleteFilesToDelete() { + return deleteFilesToDelete; + } + + public DataFile[] dataFilesToAdd() { + return dataFilesToAdd; + } + + public DeleteFile[] deleteFilesToAdd() { + return deleteFilesToAdd; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final Set dataFilesToDelete = Sets.newHashSet(); + private final Set deleteFilesToDelete = Sets.newHashSet(); + private final Set dataFilesToAdd = Sets.newHashSet(); + private final Set deleteFilesToAdd = Sets.newHashSet(); + + public Builder addDataFilesToDelete(DataFile... dataFiles) { + Collections.addAll(dataFilesToDelete, dataFiles); + return this; + } + + public Builder addDataFilesToDelete(Iterable dataFiles) { + Iterables.addAll(dataFilesToDelete, dataFiles); + return this; + } + + public Builder addDeleteFilesToDelete(DeleteFile... deleteFiles) { + Collections.addAll(deleteFilesToDelete, deleteFiles); + return this; + } + + public Builder addDeleteFilesToDelete(Iterable deleteFiles) { + Iterables.addAll(deleteFilesToDelete, deleteFiles); + return this; + } + + public Builder addDataFilesToAdd(DataFile... dataFiles) { + Collections.addAll(dataFilesToAdd, dataFiles); + return this; + } + + public Builder addDataFilesToAdd(Iterable dataFiles) { + Iterables.addAll(dataFilesToAdd, dataFiles); + return this; + } + + public Builder addDeleteFilesToAdd(Iterable deleteFiles) { + Iterables.addAll(deleteFilesToAdd, deleteFiles); + return this; + } + + public Builder addDeleteFilesToAdd(DeleteFile... deleteFiles) { + Collections.addAll(deleteFilesToAdd, deleteFiles); + return this; + } + + public Builder merge(Iterable results) { + for (RewriteResult result : results) { + Collections.addAll(dataFilesToDelete, result.dataFilesToDelete); + Collections.addAll(deleteFilesToDelete, result.deleteFilesToDelete); + Collections.addAll(dataFilesToAdd, result.dataFilesToAdd); + Collections.addAll(deleteFilesToAdd, result.deleteFilesToAdd); + } + return this; + } + + public Builder merge(RewriteResult... results) { + for (RewriteResult result : results) { + Collections.addAll(dataFilesToDelete, result.dataFilesToDelete); + Collections.addAll(deleteFilesToDelete, result.deleteFilesToDelete); + Collections.addAll(dataFilesToAdd, result.dataFilesToAdd); + Collections.addAll(deleteFilesToAdd, result.deleteFilesToAdd); + } + return this; + } + + public RewriteResult build() { + return new RewriteResult(dataFilesToDelete, deleteFilesToDelete, dataFilesToAdd, deleteFilesToAdd); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java index 291be0ca26bf..e8df6bf2628a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java +++ b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java @@ -23,11 +23,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.actions.BaseRewriteDataFilesAction; import org.apache.iceberg.flink.source.RowDataRewriter; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.RewriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class RewriteDataFilesAction extends BaseRewriteDataFilesAction { @@ -47,9 +47,10 @@ protected FileIO fileIO() { } @Override - protected List rewriteDataForTasks(List combinedScanTasks) { + protected RewriteResult rewriteDataForTasks(List combinedScanTasks) { int size = combinedScanTasks.size(); int parallelism = Math.min(size, maxParallelism); + DataStream dataStream = env.fromCollection(combinedScanTasks); RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), caseSensitive(), fileIO(), encryptionManager()); try { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index b26ecf0f38bb..8080ae56745e 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -19,18 +19,16 @@ package org.apache.iceberg.flink.source; -import java.util.Collection; -import java.util.List; import java.util.Locale; -import java.util.stream.Collectors; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CloseableIterator; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -39,8 +37,8 @@ import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.RewriteResult; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +50,6 @@ public class RowDataRewriter { private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); private final Schema schema; - private final FileFormat format; private final String nameMapping; private final FileIO io; private final boolean caseSensitive; @@ -70,7 +67,7 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); - this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + FileFormat format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( table.schema(), @@ -82,19 +79,27 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption Long.MAX_VALUE, format, table.properties(), + // TODO Need to fill the latest equality field ids if plan to produce delete files after rewrite. null); } - public List rewriteDataForTasks(DataStream dataStream, int parallelism) throws Exception { + public RewriteResult rewriteDataForTasks(DataStream dataStream, int parallelism) throws Exception { RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, encryptionManager, taskWriterFactory); - DataStream> ds = dataStream.map(map).setParallelism(parallelism); - return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + tableName)).stream().flatMap(Collection::stream) - .collect(Collectors.toList()); + + RewriteResult.Builder builder = RewriteResult.builder(); + + try (CloseableIterator results = dataStream.map(map) + .setParallelism(parallelism) + .executeAndCollect("Rewrite table :" + tableName)) { + + builder.merge(() -> results); + } + + return builder.build(); } - public static class RewriteMap extends RichMapFunction> { + public static class RewriteMap extends RichMapFunction { - private TaskWriter writer; private int subTaskId; private int attemptId; @@ -124,16 +129,28 @@ public void open(Configuration parameters) { } @Override - public List map(CombinedScanTask task) throws Exception { + public RewriteResult map(CombinedScanTask task) throws Exception { + // Initialize the builder of ResultWriter. + RewriteResult.Builder resultBuilder = RewriteResult.builder(); + for (FileScanTask scanTask : task.files()) { + resultBuilder.addDataFilesToDelete(scanTask.file()); + resultBuilder.addDeleteFilesToDelete(scanTask.deletes()); + } + // Initialize the task writer. - this.writer = taskWriterFactory.create(); - try (RowDataIterator iterator = - new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) { + TaskWriter writer = taskWriterFactory.create(); + try (RowDataIterator iterator = new RowDataIterator(task, io, encryptionManager, schema, + schema, nameMapping, caseSensitive)) { + while (iterator.hasNext()) { RowData rowData = iterator.next(); writer.write(rowData); } - return Lists.newArrayList(writer.dataFiles()); + + // Add the data files only because deletions from delete files has been eliminated. + resultBuilder.addDataFilesToAdd(writer.dataFiles()); + + return resultBuilder.build(); } catch (Throwable originalThrowable) { try { LOG.error("Aborting commit for (subTaskId {}, attemptId {})", subTaskId, attemptId); diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java index 28103263172e..dec9e82e5f6f 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java @@ -87,7 +87,7 @@ protected TableEnvironment getTableEnv() { return tEnv; } - protected static TableResult exec(TableEnvironment env, String query, Object... args) { + public static TableResult exec(TableEnvironment env, String query, Object... args) { return env.executeSql(String.format(query, args)); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java index b4fb243edd4f..2b441cfed4c1 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java +++ b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -22,36 +22,59 @@ import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.actions.RewriteDataFilesActionResult; -import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.FlinkTableOptions; +import org.apache.iceberg.flink.FlinkTestBase; +import org.apache.iceberg.flink.MiniClusterResource; import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.junit.After; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -61,39 +84,54 @@ import static org.apache.iceberg.flink.SimpleDataUtil.RECORD; @RunWith(Parameterized.class) -public class TestRewriteDataFilesAction extends FlinkCatalogTestBase { +public class TestRewriteDataFilesAction { + private static final AtomicInteger FILE_COUNT = new AtomicInteger(1); + private static final Configuration CONF = new Configuration(); + private static final String CATALOG = "test_catalog"; + private static final String DATABASE = "test_database"; + + @ClassRule + public static final TemporaryFolder HADOOP_WAREHOUSE = new TemporaryFolder(); + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = + MiniClusterResource.createWithClassloaderCheckDisabled(); private static final String TABLE_NAME_UNPARTITIONED = "test_table_unpartitioned"; private static final String TABLE_NAME_PARTITIONED = "test_table_partitioned"; + private final FileFormat format; + private final int formatVersion; + private final TableEnvironment tEnv; + private Table icebergTableUnPartitioned; private Table icebergTablePartitioned; - public TestRewriteDataFilesAction(String catalogName, Namespace baseNamespace, FileFormat format) { - super(catalogName, baseNamespace); - this.format = format; + @Parameterized.Parameters(name = "format={0}, formatVersion={1}") + public static Iterable parameters() { + return Lists.newArrayList( + new Object[] {"avro", 1}, + new Object[] {"avro", 2}, + new Object[] {"orc", 1}, + new Object[] {"parquet", 1}, + new Object[] {"parquet", 2} + ); } - @Override - protected TableEnvironment getTableEnv() { - super.getTableEnv() - .getConfig() + public TestRewriteDataFilesAction(String format, int formatVersion) { + this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + this.formatVersion = formatVersion; + this.tEnv = TableEnvironment.create( + EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inBatchMode() + .build() + ); + this.tEnv.getConfig() .getConfiguration() + .set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false) .set(CoreOptions.DEFAULT_PARALLELISM, 1); - return super.getTableEnv(); - } - - @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}") - public static Iterable parameters() { - List parameters = Lists.newArrayList(); - for (FileFormat format : new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) { - for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { - String catalogName = (String) catalogParams[0]; - Namespace baseNamespace = (Namespace) catalogParams[1]; - parameters.add(new Object[] {catalogName, baseNamespace, format}); - } - } - return parameters; } @Rule @@ -101,28 +139,61 @@ public static Iterable parameters() { @Before public void before() { - super.before(); - sql("CREATE DATABASE %s", flinkDatabase); - sql("USE CATALOG %s", catalogName); + String warehouseRoot = String.format("file://%s", HADOOP_WAREHOUSE.getRoot().getAbsolutePath()); + // Create catalog and use it. + sql("CREATE CATALOG %s WITH ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", CATALOG, warehouseRoot); + sql("USE CATALOG %s", CATALOG); + + // Create database and use it. + sql("CREATE DATABASE %s", DATABASE); sql("USE %s", DATABASE); - sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME_UNPARTITIONED, - format.name()); - icebergTableUnPartitioned = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, - TABLE_NAME_UNPARTITIONED)); + // Create tables. + sql("CREATE TABLE %s (id int, data varchar) " + + " WITH (" + + " 'write.format.default'='%s'" + + " )", + TABLE_NAME_UNPARTITIONED, format.name()); sql("CREATE TABLE %s (id int, data varchar,spec varchar) " + - " PARTITIONED BY (data,spec) with ('write.format.default'='%s')", + " PARTITIONED BY (data,spec)" + + " WITH (" + + " 'write.format.default'='%s'" + + " )", TABLE_NAME_PARTITIONED, format.name()); - icebergTablePartitioned = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, - TABLE_NAME_PARTITIONED)); + + HadoopCatalog catalog = new HadoopCatalog(); + catalog.setConf(CONF); + catalog.initialize("hadoop_catalog", ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseRoot)); + + icebergTableUnPartitioned = catalog.loadTable(TableIdentifier.of(DATABASE, TABLE_NAME_UNPARTITIONED)); + upgradeToFormatVersion(icebergTableUnPartitioned, formatVersion); + + icebergTablePartitioned = catalog.loadTable(TableIdentifier.of(DATABASE, TABLE_NAME_PARTITIONED)); + upgradeToFormatVersion(icebergTablePartitioned, formatVersion); + } + + private void upgradeToFormatVersion(Table table, int version) { + if (version > 1) { + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(version)); + } + } + + private List sql(String query, Object... args) { + try (CloseableIterator iter = FlinkTestBase.exec(tEnv, query, args).collect()) { + return Lists.newArrayList(iter); + } catch (Exception e) { + throw new RuntimeException("Failed to collect table result", e); + } } @After public void clean() { - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED); - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); - super.clean(); + sql("DROP TABLE IF EXISTS %s.%s.%s", CATALOG, DATABASE, TABLE_NAME_UNPARTITIONED); + sql("DROP TABLE IF EXISTS %s.%s.%s", CATALOG, DATABASE, TABLE_NAME_PARTITIONED); + sql("DROP DATABASE IF EXISTS %s.%s", CATALOG, DATABASE); + sql("DROP CATALOG IF EXISTS %s", CATALOG); } @Test @@ -381,4 +452,152 @@ public void testRewriteAvoidRepeateCompress() throws IOException { expected.add(SimpleDataUtil.createRecord(2, "b")); SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected); } + + private OutputFile newOutputFile() { + return HadoopOutputFile.fromLocation(String.format("file://%s/%s.%s", + HADOOP_WAREHOUSE.getRoot().getAbsolutePath(), FILE_COUNT.incrementAndGet(), format), CONF); + } + + @Test + public void testRewriteDeleteInUnpartitionedTable() throws IOException { + Assume.assumeTrue("Iceberg format v1 does not support row-level delete.", formatVersion > 1); + sql("INSERT INTO %s VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC') ", TABLE_NAME_UNPARTITIONED); + sql("INSERT INTO %s VALUES (4, 'DDD'), (5, 'EEE'), (6, 'FFF') ", TABLE_NAME_UNPARTITIONED); + sql("INSERT INTO %s VALUES (7, 'GGG'), (8, 'HHH'), (9, 'III') ", TABLE_NAME_UNPARTITIONED); + + icebergTableUnPartitioned.refresh(); + + Schema deleteRowSchema = icebergTableUnPartitioned.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + + List deletions = Lists.newArrayList( + dataDelete.copy("data", "BBB"), // id = 2 + dataDelete.copy("data", "EEE"), // id = 5 + dataDelete.copy("data", "HHH") // id = 8 + ); + DeleteFile eqDeletes = FileHelpers.writeDeleteFile(icebergTableUnPartitioned, newOutputFile(), + deletions, deleteRowSchema); + + icebergTableUnPartitioned.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + assertSetsEqual(Lists.newArrayList( + Row.of(1, "AAA"), + Row.of(3, "CCC"), + Row.of(4, "DDD"), + Row.of(6, "FFF"), + Row.of(7, "GGG"), + Row.of(9, "III")), + sql("SELECT * FROM %s", TABLE_NAME_UNPARTITIONED) + ); + + List tasks = Lists.newArrayList(icebergTableUnPartitioned.newScan().planFiles()); + Assert.assertEquals("Should have 3 data files", 3, tasks.size()); + Set deleteFiles = tasks.stream() + .map(FileScanTask::deletes) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + Assert.assertEquals("Should have 1 equality delete files.", 1, deleteFiles.size()); + + // Rewrite the files. + RewriteDataFilesActionResult result = Actions.forTable(icebergTableUnPartitioned) + .rewriteDataFiles() + .execute(); + + Assert.assertEquals(3, result.deletedDataFiles().size()); + Assert.assertEquals(1, result.deletedDeleteFiles().size()); + Assert.assertEquals(1, result.addedDataFiles().size()); + Assert.assertEquals(0, result.addedDeleteFiles().size()); + Assert.assertEquals(1, Lists.newArrayList(icebergTableUnPartitioned.newScan().planFiles()).size()); + + // Assert rows in the final rewritten iceberg table. + assertSetsEqual(Lists.newArrayList( + Row.of(1, "AAA"), + Row.of(3, "CCC"), + Row.of(4, "DDD"), + Row.of(6, "FFF"), + Row.of(7, "GGG"), + Row.of(9, "III")), + sql("SELECT * FROM %s", TABLE_NAME_UNPARTITIONED) + ); + } + + @Test + public void testRewriteDeleteInPartitionedTable() throws IOException { + Assume.assumeTrue("Iceberg format v1 does not support row-level delete.", formatVersion > 1); + sql("INSERT INTO %s VALUES (1, 'AAA', 'p1'), (2, 'BBB', 'p2'), (3, 'CCC', 'p3') ", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s VALUES (4, 'AAA', 'p1'), (5, 'BBB', 'p2'), (6, 'CCC', 'p3') ", TABLE_NAME_PARTITIONED); + sql("INSERT INTO %s VALUES (7, 'AAA', 'p1'), (8, 'BBB', 'p2'), (9, 'CCC', 'p3') ", TABLE_NAME_PARTITIONED); + + icebergTablePartitioned.refresh(); + + Schema deleteRowSchema = icebergTablePartitioned.schema().select("id", "data", "spec"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + + List deletions = Lists.newArrayList( + dataDelete.copy("id", 4, "data", "AAA", "spec", "p1"), // id = 4 + dataDelete.copy("id", 5, "data", "BBB", "spec", "p2"), // id = 5 + dataDelete.copy("id", 6, "data", "CCC", "spec", "p3") // id = 6 + ); + + DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile(icebergTablePartitioned, newOutputFile(), + TestHelpers.Row.of("AAA", "p1"), deletions.subList(0, 1), deleteRowSchema); + + DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile(icebergTablePartitioned, newOutputFile(), + TestHelpers.Row.of("BBB", "p2"), deletions.subList(1, 2), deleteRowSchema); + + DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile(icebergTablePartitioned, newOutputFile(), + TestHelpers.Row.of("CCC", "p3"), deletions.subList(2, 3), deleteRowSchema); + + icebergTablePartitioned.newRowDelta() + .addDeletes(eqDeletes1) + .addDeletes(eqDeletes2) + .addDeletes(eqDeletes3) + .commit(); + + assertSetsEqual(Lists.newArrayList( + Row.of(1, "AAA", "p1"), + Row.of(2, "BBB", "p2"), + Row.of(3, "CCC", "p3"), + Row.of(7, "AAA", "p1"), + Row.of(8, "BBB", "p2"), + Row.of(9, "CCC", "p3")), + sql("SELECT * FROM %s", TABLE_NAME_PARTITIONED) + ); + + List tasks = Lists.newArrayList(icebergTablePartitioned.newScan().planFiles()); + Assert.assertEquals("Should have 9 data files", 9, tasks.size()); + Set deleteFiles = tasks.stream() + .map(FileScanTask::deletes) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + Assert.assertEquals("Should have 3 equality delete files.", 3, deleteFiles.size()); + + // Rewrite the files. + RewriteDataFilesActionResult result = Actions.forTable(icebergTablePartitioned) + .rewriteDataFiles() + .execute(); + + Assert.assertEquals(9, result.deletedDataFiles().size()); + Assert.assertEquals(3, result.deletedDeleteFiles().size()); + Assert.assertEquals(3, result.addedDataFiles().size()); + Assert.assertEquals(0, result.addedDeleteFiles().size()); + Assert.assertEquals(3, Lists.newArrayList(icebergTablePartitioned.newScan().planFiles()).size()); + + // Assert rows in the final rewritten iceberg table. + assertSetsEqual(Lists.newArrayList( + Row.of(1, "AAA", "p1"), + Row.of(2, "BBB", "p2"), + Row.of(3, "CCC", "p3"), + Row.of(7, "AAA", "p1"), + Row.of(8, "BBB", "p2"), + Row.of(9, "CCC", "p3")), + sql("SELECT * FROM %s", TABLE_NAME_PARTITIONED) + ); + } + + private void assertSetsEqual(Iterable expected, Iterable actual) { + Assert.assertEquals("Should have same elements", Sets.newHashSet(expected), Sets.newHashSet(actual)); + } } diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java index 7e6a81795a1f..9a3709379f66 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java @@ -21,10 +21,10 @@ import java.util.List; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.RewriteResult; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.source.RowDataRewriter; import org.apache.spark.api.java.JavaRDD; @@ -57,7 +57,7 @@ protected FileIO fileIO() { } @Override - protected List rewriteDataForTasks(List combinedScanTasks) { + protected RewriteResult rewriteDataForTasks(List combinedScanTasks) { JavaRDD taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size()); Broadcast io = sparkContext.broadcast(fileIO()); Broadcast encryption = sparkContext.broadcast(encryptionManager()); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 07c7e008bab6..6d60c90eb300 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -20,14 +20,12 @@ package org.apache.iceberg.spark.source; import java.io.Serializable; -import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.stream.Collectors; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -36,9 +34,9 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.RewriteResult; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.TaskContext; @@ -82,15 +80,16 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive, this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); } - public List rewriteDataForTasks(JavaRDD taskRDD) { - JavaRDD> dataFilesRDD = taskRDD.map(this::rewriteDataForTask); + public RewriteResult rewriteDataForTasks(JavaRDD taskRDD) { + List rewriteResults = taskRDD.map(this::rewriteDataForTask) + .collect(); - return dataFilesRDD.collect().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + return RewriteResult.builder() + .merge(rewriteResults) + .build(); } - private List rewriteDataForTask(CombinedScanTask task) throws Exception { + private RewriteResult rewriteDataForTask(CombinedScanTask task) throws Exception { TaskContext context = TaskContext.get(); int partitionId = context.partitionId(); long taskId = context.taskAttemptId(); @@ -119,6 +118,12 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio structType); } + RewriteResult.Builder resultBuilder = RewriteResult.builder(); + for (FileScanTask scanTask : task.files()) { + resultBuilder.addDataFilesToDelete(scanTask.file()); + resultBuilder.addDeleteFilesToDelete(scanTask.deletes()); + } + try { while (dataReader.next()) { InternalRow row = dataReader.get(); @@ -129,7 +134,11 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio dataReader = null; writer.close(); - return Lists.newArrayList(writer.dataFiles()); + + // Add the data files only because deletions from delete files has been eliminated. + return resultBuilder + .addDataFilesToAdd(writer.dataFiles()) + .build(); } catch (Throwable originalThrowable) { try { diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java index 7b541de81ca3..95bb94264166 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java @@ -24,17 +24,30 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkTestBase; @@ -341,7 +354,7 @@ public void testRewriteDataFilesForLargeFile() throws AnalysisException { DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes)); Assert.assertEquals("Should have 3 files before rewrite", 3, dataFiles.size()); - spark.read().format("iceberg").load(tableLocation).createTempView("origin"); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("origin"); long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count(); List originalRecords = sql("SELECT * from origin sort by c2"); @@ -357,7 +370,7 @@ public void testRewriteDataFilesForLargeFile() throws AnalysisException { Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size()); Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size()); - spark.read().format("iceberg").load(tableLocation).createTempView("postRewrite"); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("postRewrite"); long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count(); List rewrittenRecords = sql("SELECT * from postRewrite sort by c2"); @@ -365,6 +378,171 @@ public void testRewriteDataFilesForLargeFile() throws AnalysisException { assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords); } + @Test + public void testRewriteDeletesInUnpartitionedTable() throws Exception { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = TABLES.create(SCHEMA, spec, ImmutableMap.of(), tableLocation); + Assert.assertNull("Table must be empty", table.currentSnapshot()); + upgradeToFormatV2(table); + + List txn1 = Lists.newArrayList( + new ThreeColumnRecord(0, "AAA", "AAA-0"), + new ThreeColumnRecord(1, "BBB", "BBB-1"), + new ThreeColumnRecord(2, "CCC", "CCC-2"), + new ThreeColumnRecord(3, "DDD", "DDD-3") + ); + writeRecords(txn1); + + List txn2 = Lists.newArrayList( + new ThreeColumnRecord(4, "EEE", "EEE-4"), + new ThreeColumnRecord(5, "FFF", "FFF-5") + ); + writeRecords(txn2); + + // Commit the txn to delete few rows. + Schema deleteRowSchema = table.schema().select("c2"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List deletions = Lists.newArrayList( + dataDelete.copy("c2", "BBB"), + dataDelete.copy("c2", "CCC"), + dataDelete.copy("c2", "DDD"), + dataDelete.copy("c2", "FFF") + ); + DeleteFile eqDeletes = FileHelpers.writeDeleteFile(table, newOutputFile(), deletions, deleteRowSchema); + table.newRowDelta().addDeletes(eqDeletes).commit(); + + // Read original records before action. + long originalNumRecords = 2; + List originalRecords = Lists.newArrayList( + new Object[] {0, "AAA", "AAA-0"}, + new Object[] {4, "EEE", "EEE-4"} + ); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("preRewrite"); + assertEquals("Rows should be as expected", originalRecords, sql("SELECT * from preRewrite sort by c2")); + + // Execute the rewrite files action. + table.refresh(); + RewriteDataFilesActionResult result = Actions.forTable(table) + .rewriteDataFiles() + .execute(); + + Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size()); + Assert.assertEquals("Action should delete 1 delete files", 1, result.deletedDeleteFiles().size()); + Assert.assertEquals("Action should add 1 data files", 1, result.addedDataFiles().size()); + Assert.assertEquals("Action should add 0 delete files", 0, result.addedDeleteFiles().size()); + + // Read rewritten records after action. + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("postRewrite"); + long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count(); + List rewrittenRecords = sql("SELECT * from postRewrite sort by c2"); + + Assert.assertEquals(originalNumRecords, postRewriteNumRecords); + assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords); + } + + @Test + public void testRewriteDeletesInPartitionedTable() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .truncate("c2", 2) + .build(); + Table table = TABLES.create(SCHEMA, spec, ImmutableMap.of(), tableLocation); + upgradeToFormatV2(table); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD") + ); + writeRecords(records2); + + List records3 = Lists.newArrayList( + new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"), + new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG") + ); + writeRecords(records3); + + List records4 = Lists.newArrayList( + new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH") + ); + writeRecords(records4); + + // Commit the txn to delete few rows. + Schema deleteRowSchema = table.schema().select("c1", "c2", "c3"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List deletions = Lists.newArrayList( + dataDelete.copy("c1", 1, "c2", "AAAAAAAAAA", "c3", "CCCC"), + dataDelete.copy("c1", 1, "c2", "BBBBBBBBBB", "c3", "DDDD"), + dataDelete.copy("c1", 2, "c2", "AAAAAAAAAA", "c3", "GGGG"), + dataDelete.copy("c1", 2, "c2", "BBBBBBBBBB", "c3", "HHHH") + ); + DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile(table, newOutputFile(), + TestHelpers.Row.of(1, "AA"), deletions.subList(0, 1), deleteRowSchema); + DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile(table, newOutputFile(), + TestHelpers.Row.of(1, "BB"), deletions.subList(1, 2), deleteRowSchema); + DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile(table, newOutputFile(), + TestHelpers.Row.of(2, "AA"), deletions.subList(2, 3), deleteRowSchema); + DeleteFile eqDeletes4 = FileHelpers.writeDeleteFile(table, newOutputFile(), + TestHelpers.Row.of(2, "BB"), deletions.subList(3, 4), deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes1) + .addDeletes(eqDeletes2) + .addDeletes(eqDeletes3) + .addDeletes(eqDeletes4) + .commit(); + + // Read original records before action. + long originalNumRecords = 4; + List originalRecords = Lists.newArrayList( + new Object[] {1, "AAAAAAAAAA", "AAAA"}, + new Object[] {1, "BBBBBBBBBB", "BBBB"}, + new Object[] {2, "AAAAAAAAAA", "EEEE"}, + new Object[] {2, "BBBBBBBBBB", "FFFF"} + ); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("preRewrite"); + assertEquals("Rows should be as expected", originalRecords, sql("SELECT * from preRewrite sort by c3")); + + // Execute the rewrite files action. + table.refresh(); + RewriteDataFilesActionResult result = Actions.forTable(table) + .rewriteDataFiles() + .execute(); + + Assert.assertEquals("Action should delete 8 data files", 8, result.deletedDataFiles().size()); + Assert.assertEquals("Action should delete 4 delete files", 4, result.deletedDeleteFiles().size()); + Assert.assertEquals("Action should add 4 data files", 4, result.addedDataFiles().size()); + Assert.assertEquals("Action should add 0 delete files", 0, result.addedDeleteFiles().size()); + + // Read rewritten records after action. + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("postRewrite"); + long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count(); + List rewrittenRecords = sql("SELECT * from postRewrite sort by c3"); + + Assert.assertEquals(originalNumRecords, postRewriteNumRecords); + assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords); + } + + private static Table upgradeToFormatV2(Table table) { + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + return table; + } + + private static final AtomicInteger FILE_COUNT = new AtomicInteger(1); + + private OutputFile newOutputFile() { + return HadoopOutputFile.fromLocation(String.format("%s/%s.%s", tableLocation, + FILE_COUNT.incrementAndGet(), FileFormat.PARQUET), new Configuration()); + } + private void writeRecords(List records) { Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); writeDF(df);