diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java new file mode 100644 index 000000000000..343ebfb8dc16 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.math.RoundingMode; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles; +import org.apache.iceberg.actions.RewritePositionDeleteFiles; +import org.apache.iceberg.actions.RewritePositionDeletesCommitManager; +import org.apache.iceberg.actions.RewritePositionDeletesCommitManager.CommitService; +import org.apache.iceberg.actions.RewritePositionDeletesGroup; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Spark implementation of {@link RewritePositionDeleteFiles}. */ +public class RewritePositionDeleteFilesSparkAction + extends BaseSnapshotUpdateSparkAction + implements RewritePositionDeleteFiles { + + private static final Logger LOG = + LoggerFactory.getLogger(RewritePositionDeleteFilesSparkAction.class); + private static final Set VALID_OPTIONS = + ImmutableSet.of( + MAX_CONCURRENT_FILE_GROUP_REWRITES, + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_MAX_COMMITS, + REWRITE_JOB_ORDER); + private static final Result EMPTY_RESULT = + ImmutableRewritePositionDeleteFiles.Result.builder().build(); + + private final Table table; + private final SparkBinPackPositionDeletesRewriter rewriter; + + private int maxConcurrentFileGroupRewrites; + private int maxCommits; + private boolean partialProgressEnabled; + private RewriteJobOrder rewriteJobOrder; + + RewritePositionDeleteFilesSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table); + } + + @Override + protected RewritePositionDeleteFilesSparkAction self() { + return this; + } + + @Override + public RewritePositionDeleteFilesSparkAction filter(Expression expression) { + throw new UnsupportedOperationException("Regular filters not supported yet."); + } + + @Override + public Result execute() { + if (table.currentSnapshot() == null) { + LOG.info("Nothing found to rewrite in empty table {}", table.name()); + return EMPTY_RESULT; + } + + validateAndInitOptions(); + + StructLikeMap>> fileGroupsByPartition = planFileGroups(); + RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); + + if (ctx.totalGroupCount() == 0) { + LOG.info("Nothing found to rewrite in {}", table.name()); + return EMPTY_RESULT; + } + + Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); + + if (partialProgressEnabled) { + return doExecuteWithPartialProgress(ctx, groupStream, commitManager()); + } else { + return doExecute(ctx, groupStream, commitManager()); + } + } + + private StructLikeMap>> planFileGroups() { + CloseableIterable fileTasks = planFiles(); + + try { + StructType partitionType = Partitioning.partitionType(table); + StructLikeMap> fileTasksByPartition = + groupByPartition(partitionType, fileTasks); + return fileGroupsByPartition(fileTasksByPartition); + } finally { + try { + fileTasks.close(); + } catch (IOException io) { + LOG.error("Cannot properly close file iterable while planning for rewrite", io); + } + } + } + + private CloseableIterable planFiles() { + Table deletesTable = + MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); + + return CloseableIterable.transform( + deletesTable.newBatchScan().ignoreResiduals().planFiles(), + task -> (PositionDeletesScanTask) task); + } + + private StructLikeMap> groupByPartition( + StructType partitionType, Iterable tasks) { + StructLikeMap> filesByPartition = + StructLikeMap.create(partitionType); + + for (PositionDeletesScanTask task : tasks) { + StructLike coerced = coercePartition(task, partitionType); + + List partitionTasks = filesByPartition.get(coerced); + if (partitionTasks == null) { + partitionTasks = Lists.newArrayList(); + } + partitionTasks.add(task); + filesByPartition.put(coerced, partitionTasks); + } + + return filesByPartition; + } + + private StructLikeMap>> fileGroupsByPartition( + StructLikeMap> filesByPartition) { + return filesByPartition.transformValues(this::planFileGroups); + } + + private List> planFileGroups(List tasks) { + return ImmutableList.copyOf(rewriter.planFileGroups(tasks)); + } + + private RewritePositionDeletesGroup rewriteDeleteFiles( + RewriteExecutionContext ctx, RewritePositionDeletesGroup fileGroup) { + String desc = jobDesc(fileGroup, ctx); + Set addedFiles = + withJobGroupInfo( + newJobGroupInfo("REWRITE-POSITION-DELETES", desc), + () -> rewriter.rewrite(fileGroup.tasks())); + + fileGroup.setOutputFiles(addedFiles); + LOG.info("Rewrite position deletes ready to be committed - {}", desc); + return fileGroup; + } + + private ExecutorService rewriteService() { + return MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) + Executors.newFixedThreadPool( + maxConcurrentFileGroupRewrites, + new ThreadFactoryBuilder() + .setNameFormat("Rewrite-Position-Delete-Service-%d") + .build())); + } + + private RewritePositionDeletesCommitManager commitManager() { + return new RewritePositionDeletesCommitManager(table); + } + + private Result doExecute( + RewriteExecutionContext ctx, + Stream groupStream, + RewritePositionDeletesCommitManager commitManager) { + ExecutorService rewriteService = rewriteService(); + + ConcurrentLinkedQueue rewrittenGroups = + Queues.newConcurrentLinkedQueue(); + + Tasks.Builder rewriteTaskBuilder = + Tasks.foreach(groupStream) + .executeWith(rewriteService) + .stopOnFailure() + .noRetry() + .onFailure( + (fileGroup, exception) -> + LOG.warn( + "Failure during rewrite process for group {}", + fileGroup.info(), + exception)); + + try { + rewriteTaskBuilder.run(fileGroup -> rewrittenGroups.add(rewriteDeleteFiles(ctx, fileGroup))); + } catch (Exception e) { + // At least one rewrite group failed, clean up all completed rewrites + LOG.error( + "Cannot complete rewrite, {} is not enabled and one of the file set groups failed to " + + "be rewritten. This error occurred during the writing of new files, not during the commit process. This " + + "indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling " + + "{} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished " + + "being written.", + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_ENABLED, + rewrittenGroups.size(), + e); + + Tasks.foreach(rewrittenGroups).suppressFailureWhenFinished().run(commitManager::abort); + throw e; + } finally { + rewriteService.shutdown(); + } + + try { + commitManager.commitOrClean(Sets.newHashSet(rewrittenGroups)); + } catch (ValidationException | CommitFailedException e) { + String errorMessage = + String.format( + "Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that " + + "this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of " + + "conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. " + + "Separate smaller rewrite commits can succeed independently while any commits that conflict with " + + "another Iceberg operation will be ignored. This mode will create additional snapshots in the table " + + "history, one for each commit.", + PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS); + throw new RuntimeException(errorMessage, e); + } + + List rewriteResults = + rewrittenGroups.stream() + .map(RewritePositionDeletesGroup::asResult) + .collect(Collectors.toList()); + + return ImmutableRewritePositionDeleteFiles.Result.builder() + .rewriteResults(rewriteResults) + .build(); + } + + private Result doExecuteWithPartialProgress( + RewriteExecutionContext ctx, + Stream groupStream, + RewritePositionDeletesCommitManager commitManager) { + ExecutorService rewriteService = rewriteService(); + + // start commit service + int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); + CommitService commitService = commitManager.service(groupsPerCommit); + commitService.start(); + + // start rewrite tasks + Tasks.foreach(groupStream) + .suppressFailureWhenFinished() + .executeWith(rewriteService) + .noRetry() + .onFailure( + (fileGroup, exception) -> + LOG.error("Failure during rewrite group {}", fileGroup.info(), exception)) + .run(fileGroup -> commitService.offer(rewriteDeleteFiles(ctx, fileGroup))); + rewriteService.shutdown(); + + // stop commit service + commitService.close(); + List commitResults = commitService.results(); + if (commitResults.size() == 0) { + LOG.error( + "{} is true but no rewrite commits succeeded. Check the logs to determine why the individual " + + "commits failed. If this is persistent it may help to increase {} which will break the rewrite operation " + + "into smaller commits.", + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_MAX_COMMITS); + } + + List rewriteResults = + commitResults.stream() + .map(RewritePositionDeletesGroup::asResult) + .collect(Collectors.toList()); + return ImmutableRewritePositionDeleteFiles.Result.builder() + .rewriteResults(rewriteResults) + .build(); + } + + private Stream toGroupStream( + RewriteExecutionContext ctx, + Map>> groupsByPartition) { + return groupsByPartition.entrySet().stream() + .filter(e -> e.getValue().size() != 0) + .flatMap( + e -> { + StructLike partition = e.getKey(); + List> scanGroups = e.getValue(); + return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); + }) + .sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder)); + } + + private RewritePositionDeletesGroup newRewriteGroup( + RewriteExecutionContext ctx, StructLike partition, List tasks) { + int globalIndex = ctx.currentGlobalIndex(); + int partitionIndex = ctx.currentPartitionIndex(partition); + FileGroupInfo info = + ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder() + .globalIndex(globalIndex) + .partitionIndex(partitionIndex) + .partition(partition) + .build(); + return new RewritePositionDeletesGroup(info, tasks); + } + + private void validateAndInitOptions() { + Set validOptions = Sets.newHashSet(rewriter.validOptions()); + validOptions.addAll(VALID_OPTIONS); + + Set invalidKeys = Sets.newHashSet(options().keySet()); + invalidKeys.removeAll(validOptions); + + Preconditions.checkArgument( + invalidKeys.isEmpty(), + "Cannot use options %s, they are not supported by the action or the rewriter %s", + invalidKeys, + rewriter.description()); + + rewriter.init(options()); + + this.maxConcurrentFileGroupRewrites = + PropertyUtil.propertyAsInt( + options(), + MAX_CONCURRENT_FILE_GROUP_REWRITES, + MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT); + + this.maxCommits = + PropertyUtil.propertyAsInt( + options(), PARTIAL_PROGRESS_MAX_COMMITS, PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT); + + this.partialProgressEnabled = + PropertyUtil.propertyAsBoolean( + options(), PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED_DEFAULT); + + this.rewriteJobOrder = + RewriteJobOrder.fromName( + PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, REWRITE_JOB_ORDER_DEFAULT)); + + Preconditions.checkArgument( + maxConcurrentFileGroupRewrites >= 1, + "Cannot set %s to %s, the value must be positive.", + MAX_CONCURRENT_FILE_GROUP_REWRITES, + maxConcurrentFileGroupRewrites); + + Preconditions.checkArgument( + !partialProgressEnabled || maxCommits > 0, + "Cannot set %s to %s, the value must be positive when %s is true", + PARTIAL_PROGRESS_MAX_COMMITS, + maxCommits, + PARTIAL_PROGRESS_ENABLED); + } + + private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) { + StructLike partition = group.info().partition(); + if (partition.size() > 0) { + return String.format( + "Rewriting %d position delete files (%s, file group %d/%d, %s (%d/%d)) in %s", + group.rewrittenDeleteFiles().size(), + rewriter.description(), + group.info().globalIndex(), + ctx.totalGroupCount(), + partition, + group.info().partitionIndex(), + ctx.groupsInPartition(partition), + table.name()); + } else { + return String.format( + "Rewriting %d position files (%s, file group %d/%d) in %s", + group.rewrittenDeleteFiles().size(), + rewriter.description(), + group.info().globalIndex(), + ctx.totalGroupCount(), + table.name()); + } + } + + static class RewriteExecutionContext { + private final StructLikeMap numGroupsByPartition; + private final int totalGroupCount; + private final Map partitionIndexMap; + private final AtomicInteger groupIndex; + + RewriteExecutionContext( + StructLikeMap>> fileTasksByPartition) { + this.numGroupsByPartition = fileTasksByPartition.transformValues(List::size); + this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0); + this.partitionIndexMap = Maps.newConcurrentMap(); + this.groupIndex = new AtomicInteger(1); + } + + public int currentGlobalIndex() { + return groupIndex.getAndIncrement(); + } + + public int currentPartitionIndex(StructLike partition) { + return partitionIndexMap.merge(partition, 1, Integer::sum); + } + + public int groupsInPartition(StructLike partition) { + return numGroupsByPartition.get(partition); + } + + public int totalGroupCount() { + return totalGroupCount; + } + } + + private StructLike coercePartition(PositionDeletesScanTask task, StructType partitionType) { + return PartitionUtil.coercePartition(partitionType, task.spec(), task.partition()); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index 8c886adf510e..fb67ded96e35 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -91,4 +91,9 @@ public ExpireSnapshotsSparkAction expireSnapshots(Table table) { public DeleteReachableFilesSparkAction deleteReachableFiles(String metadataLocation) { return new DeleteReachableFilesSparkAction(spark, metadataLocation); } + + @Override + public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table) { + return new RewritePositionDeleteFilesSparkAction(spark, table); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java new file mode 100644 index 000000000000..51c4cc661f4e --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.MetadataTableType.POSITION_DELETES; +import static org.apache.spark.sql.functions.col; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.IntStream; +import org.apache.iceberg.DataFilesTable; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.SizeBasedPositionDeletesRewriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTableCache; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; + +class SparkBinPackPositionDeletesRewriter extends SizeBasedPositionDeletesRewriter { + + private final SparkSession spark; + private final SparkTableCache tableCache = SparkTableCache.get(); + private final ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + private final PositionDeletesRewriteCoordinator coordinator = + PositionDeletesRewriteCoordinator.get(); + + SparkBinPackPositionDeletesRewriter(SparkSession spark, Table table) { + super(table); + // Disable Adaptive Query Execution as this may change the output partitioning of our write + this.spark = spark.cloneSession(); + this.spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + } + + @Override + public String description() { + return "BIN-PACK"; + } + + @Override + public Set rewrite(List group) { + String groupId = UUID.randomUUID().toString(); + Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table(), POSITION_DELETES); + try { + tableCache.add(groupId, deletesTable); + taskSetManager.stageTasks(deletesTable, groupId, group); + + doRewrite(groupId, group); + + return coordinator.fetchNewFiles(deletesTable, groupId); + } finally { + tableCache.remove(groupId); + taskSetManager.removeTasks(deletesTable, groupId); + coordinator.clearRewrite(deletesTable, groupId); + } + } + + protected void doRewrite(String groupId, List group) { + // all position deletes are of the same partition, because they are in same file group + Preconditions.checkArgument(group.size() > 0, "Empty group"); + Types.StructType partitionType = group.get(0).spec().partitionType(); + StructLike partition = group.get(0).partition(); + + // read the deletes packing them into splits of the required size + Dataset posDeletes = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) + .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group))) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(groupId); + + // keep only valid position deletes + Dataset dataFiles = dataFiles(partitionType, partition); + Column joinCond = posDeletes.col("file_path").equalTo(dataFiles.col("file_path")); + Dataset validDeletes = posDeletes.join(dataFiles, joinCond, "leftsemi"); + + // write the packed deletes into new files where each split becomes a new file + validDeletes + .sortWithinPartitions("file_path", "pos") + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) + .option(SparkWriteOptions.TARGET_DELETE_FILE_SIZE_BYTES, writeMaxFileSize()) + .mode("append") + .save(groupId); + } + + /** Returns entries of {@link DataFilesTable} of specified partition */ + private Dataset dataFiles(Types.StructType partitionType, StructLike partition) { + List fields = partitionType.fields(); + Optional condition = + IntStream.range(0, fields.size()) + .mapToObj( + i -> { + Class type = fields.get(i).type().typeId().javaClass(); + Object value = partition.get(i, type); + Column col = col("partition." + fields.get(i).name()); + return col.equalTo(value); + }) + .reduce(Column::and); + if (condition.isPresent()) { + return SparkTableUtil.loadMetadataTable(spark, table(), MetadataTableType.DATA_FILES) + .filter(condition.get()); + } else { + return SparkTableUtil.loadMetadataTable(spark, table(), MetadataTableType.DATA_FILES); + } + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java new file mode 100644 index 000000000000..5c0aa2e6aa43 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -0,0 +1,831 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult; +import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result; +import org.apache.iceberg.actions.SizeBasedFileRewriter; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.source.FourColumnRecord; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runners.Parameterized; + +public class TestRewritePositionDeleteFilesAction extends SparkCatalogTestBase { + + private static final String TABLE_NAME = "test_table"; + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + private static final Map CATALOG_PROPS = + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "cache-enabled", "false"); + + private static final int SCALE = 4000; + private static final int DELETES_SCALE = 1000; + + @Parameterized.Parameters( + name = + "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + FileFormat.PARQUET + } + }; + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private final FileFormat format; + + public TestRewritePositionDeleteFilesAction( + String catalogName, String implementation, Map config, FileFormat format) { + super(catalogName, implementation, config); + this.format = format; + } + + @After + public void cleanup() { + validationCatalog.dropTable(TableIdentifier.of("default", TABLE_NAME)); + } + + @Test + public void testEmptyTable() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), SCHEMA, spec, tableProperties()); + + Result result = SparkActions.get(spark).rewritePositionDeletes(table).execute(); + Assert.assertEquals("No rewritten delete files", 0, result.rewrittenDeleteFilesCount()); + Assert.assertEquals("No added delete files", 0, result.addedDeleteFilesCount()); + } + + @Test + public void testUnpartitioned() throws Exception { + Table table = createTableUnpartitioned(2, SCALE); + List dataFiles = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + Assert.assertEquals(2, dataFiles.size()); + + List deleteFiles = deleteFiles(table); + Assert.assertEquals(2, deleteFiles.size()); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + Assert.assertEquals(2000, expectedRecords.size()); + Assert.assertEquals(2000, expectedDeletes.size()); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Expected 1 new delete file", 1, newDeleteFiles.size()); + assertLocallySorted(newDeleteFiles); + assertNotContains(deleteFiles, newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 1); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + } + + @Test + public void testRewriteAll() throws Exception { + Table table = createTablePartitioned(4, 2, SCALE); + + List dataFiles = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + Assert.assertEquals(4, dataFiles.size()); + + List deleteFiles = deleteFiles(table); + Assert.assertEquals(8, deleteFiles.size()); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + Assert.assertEquals(12000, expectedRecords.size()); + Assert.assertEquals(4000, expectedDeletes.size()); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .execute(); + + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 4 delete files", 4, newDeleteFiles.size()); + assertNotContains(deleteFiles, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 4); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + } + + @Test + public void testRewriteToSmallerTarget() throws Exception { + Table table = createTablePartitioned(4, 2, SCALE); + + List dataFiles = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + Assert.assertEquals(4, dataFiles.size()); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + Assert.assertEquals(12000, expectedRecords.size()); + Assert.assertEquals(4000, expectedDeletes.size()); + + List deleteFiles = deleteFiles(table); + Assert.assertEquals(8, deleteFiles.size()); + + long avgSize = size(deleteFiles) / deleteFiles.size(); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, String.valueOf(avgSize / 2)) + .execute(); + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 8 new delete files", 8, newDeleteFiles.size()); + assertNotContains(deleteFiles, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 4); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + } + + @Test + public void testRemoveDanglingDeletes() throws Exception { + Table table = createTablePartitioned(4, 2, SCALE); + + List dataFiles = dataFiles(table); + writePosDeletesForFiles( + table, + 2, + DELETES_SCALE, + dataFiles, + true /* Disable commit-time ManifestFilterManager removal of dangling deletes */); + + Assert.assertEquals(4, dataFiles.size()); + + List deleteFiles = deleteFiles(table); + Assert.assertEquals(8, deleteFiles.size()); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + Assert.assertEquals(12000, expectedRecords.size()); + Assert.assertEquals(4000, expectedDeletes.size()); + + SparkActions.get(spark) + .rewriteDataFiles(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 0 new delete files", 0, newDeleteFiles.size()); + assertNotContains(deleteFiles, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 4); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + Assert.assertEquals("Should be no new position deletes", 0, actualDeletes.size()); + } + + @Test + public void testSomePartitionsDanglingDeletes() throws Exception { + Table table = createTablePartitioned(4, 2, SCALE); + + List dataFiles = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + Assert.assertEquals(4, dataFiles.size()); + + List deleteFiles = deleteFiles(table); + Assert.assertEquals(8, deleteFiles.size()); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + Assert.assertEquals(12000, expectedRecords.size()); + Assert.assertEquals(4000, expectedDeletes.size()); + + // Rewrite half the data files + Expression filter = Expressions.or(Expressions.equal("c1", 0), Expressions.equal("c1", 1)); + SparkActions.get(spark) + .rewriteDataFiles(table) + .filter(filter) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 2 new delete files", 2, newDeleteFiles.size()); + assertNotContains(deleteFiles, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 4); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + // As only half the files have been rewritten, + // we expect to retain position deletes only for those not rewritten + expectedDeletes = + expectedDeletes.stream() + .filter( + r -> { + Object[] partition = (Object[]) r[3]; + return partition[0] == (Integer) 2 || partition[0] == (Integer) 3; + }) + .collect(Collectors.toList()); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + } + + @Test + public void testPartitionEvolutionAdd() throws Exception { + Table table = createTableUnpartitioned(2, SCALE); + List unpartitionedDataFiles = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, unpartitionedDataFiles); + Assert.assertEquals(2, unpartitionedDataFiles.size()); + + List unpartitionedDeleteFiles = deleteFiles(table); + Assert.assertEquals(2, unpartitionedDeleteFiles.size()); + + List expectedUnpartitionedDeletes = deleteRecords(table); + List expectedUnpartitionedRecords = records(table); + Assert.assertEquals(2000, expectedUnpartitionedRecords.size()); + Assert.assertEquals(2000, expectedUnpartitionedDeletes.size()); + + table.updateSpec().addField("c1").commit(); + writeRecords(table, 2, SCALE, 2); + List partitionedDataFiles = except(dataFiles(table), unpartitionedDataFiles); + writePosDeletesForFiles(table, 2, DELETES_SCALE, partitionedDataFiles); + Assert.assertEquals(2, partitionedDataFiles.size()); + + List partitionedDeleteFiles = except(deleteFiles(table), unpartitionedDeleteFiles); + Assert.assertEquals(4, partitionedDeleteFiles.size()); + + List expectedDeletes = deleteRecords(table); + List expectedRecords = records(table); + Assert.assertEquals(4000, expectedDeletes.size()); + Assert.assertEquals(8000, expectedRecords.size()); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + List rewrittenDeleteFiles = + Stream.concat(unpartitionedDeleteFiles.stream(), partitionedDeleteFiles.stream()) + .collect(Collectors.toList()); + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 3 new delete files", 3, newDeleteFiles.size()); + assertNotContains(rewrittenDeleteFiles, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 3); + checkSequenceNumbers(table, rewrittenDeleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + } + + @Test + public void testPartitionEvolutionRemove() throws Exception { + Table table = createTablePartitioned(2, 2, SCALE); + List dataFilesUnpartitioned = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFilesUnpartitioned); + Assert.assertEquals(2, dataFilesUnpartitioned.size()); + + List deleteFilesUnpartitioned = deleteFiles(table); + Assert.assertEquals(4, deleteFilesUnpartitioned.size()); + + table.updateSpec().removeField("c1").commit(); + + writeRecords(table, 2, SCALE); + List dataFilesPartitioned = except(dataFiles(table), dataFilesUnpartitioned); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFilesPartitioned); + Assert.assertEquals(2, dataFilesPartitioned.size()); + + List deleteFilesPartitioned = except(deleteFiles(table), deleteFilesUnpartitioned); + Assert.assertEquals(2, deleteFilesPartitioned.size()); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + Assert.assertEquals(4000, expectedDeletes.size()); + Assert.assertEquals(8000, expectedRecords.size()); + + List expectedRewritten = deleteFiles(table); + Assert.assertEquals(6, expectedRewritten.size()); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 3 new delete files", 3, newDeleteFiles.size()); + assertNotContains(expectedRewritten, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, expectedRewritten, newDeleteFiles, 3); + checkSequenceNumbers(table, expectedRewritten, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + } + + @Test + public void testSchemaEvolution() throws Exception { + Table table = createTablePartitioned(2, 2, SCALE); + List dataFiles = dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + Assert.assertEquals(2, dataFiles.size()); + + List deleteFiles = deleteFiles(table); + Assert.assertEquals(4, deleteFiles.size()); + + table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); + writeNewSchemaRecords(table, 2, SCALE, 2, 2); + + int newColId = table.schema().findField("c4").fieldId(); + List newSchemaDataFiles = + dataFiles(table).stream() + .filter(f -> f.upperBounds().containsKey(newColId)) + .collect(Collectors.toList()); + writePosDeletesForFiles(table, 2, DELETES_SCALE, newSchemaDataFiles); + + List newSchemaDeleteFiles = except(deleteFiles(table), deleteFiles); + Assert.assertEquals(4, newSchemaDeleteFiles.size()); + + table.refresh(); + List expectedDeletes = deleteRecords(table); + List expectedRecords = records(table); + Assert.assertEquals(4000, expectedDeletes.size()); // 4 files * 1000 per file + Assert.assertEquals(12000, expectedRecords.size()); // 4 * 4000 - 4000 + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + List rewrittenDeleteFiles = + Stream.concat(deleteFiles.stream(), newSchemaDeleteFiles.stream()) + .collect(Collectors.toList()); + List newDeleteFiles = deleteFiles(table); + Assert.assertEquals("Should have 2 new delete files", 4, newDeleteFiles.size()); + assertNotContains(rewrittenDeleteFiles, newDeleteFiles); + assertLocallySorted(newDeleteFiles); + checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 4); + checkSequenceNumbers(table, rewrittenDeleteFiles, newDeleteFiles); + + List actualRecords = records(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + } + + private Table createTablePartitioned(int partitions, int files, int numRecords) { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), SCHEMA, spec, tableProperties()); + + writeRecords(table, files, numRecords, partitions); + return table; + } + + private Table createTableUnpartitioned(int files, int numRecords) { + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), + SCHEMA, + PartitionSpec.unpartitioned(), + tableProperties()); + + writeRecords(table, files, numRecords); + return table; + } + + private Map tableProperties() { + return ImmutableMap.of( + TableProperties.DEFAULT_WRITE_METRICS_MODE, + "full", + TableProperties.FORMAT_VERSION, + "2", + TableProperties.DEFAULT_FILE_FORMAT, + format.toString()); + } + + private void writeRecords(Table table, int files, int numRecords) { + writeRecords(table, files, numRecords, 1); + } + + private void writeRecords(Table table, int files, int numRecords, int numPartitions) { + writeRecordsWithPartitions( + table, + files, + numRecords, + IntStream.range(0, numPartitions).mapToObj(ImmutableList::of).collect(Collectors.toList())); + } + + private void writeRecordsWithPartitions( + Table table, int files, int numRecords, List> partitions) { + int partitionTypeSize = table.spec().partitionType().fields().size(); + Assert.assertTrue( + "This method currently supports only two columns as partition columns", + partitionTypeSize <= 2); + BiFunction, ThreeColumnRecord> recordFunction = + (i, partValues) -> { + switch (partitionTypeSize) { + case (0): + return new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i)); + case (1): + return new ThreeColumnRecord(partValues.get(0), String.valueOf(i), String.valueOf(i)); + case (2): + return new ThreeColumnRecord( + partValues.get(0), String.valueOf(partValues.get(1)), String.valueOf(i)); + default: + throw new ValidationException( + "This method currently supports only two columns as partition columns"); + } + }; + List records = + partitions.stream() + .flatMap( + partition -> + IntStream.range(0, numRecords) + .mapToObj(i -> recordFunction.apply(i, partition))) + .collect(Collectors.toList()); + spark + .createDataFrame(records, ThreeColumnRecord.class) + .repartition(files) + .write() + .format("iceberg") + .mode("append") + .save(name(table)); + table.refresh(); + } + + private void writeNewSchemaRecords( + Table table, int files, int numRecords, int startingPartition, int partitions) { + List records = + IntStream.range(startingPartition, startingPartition + partitions) + .boxed() + .flatMap( + partition -> + IntStream.range(0, numRecords) + .mapToObj( + i -> + new FourColumnRecord( + partition, + String.valueOf(i), + String.valueOf(i), + String.valueOf(i)))) + .collect(Collectors.toList()); + spark + .createDataFrame(records, FourColumnRecord.class) + .repartition(files) + .write() + .format("iceberg") + .mode("append") + .save(name(table)); + } + + private List records(Table table) { + return rowsToJava( + spark.read().format("iceberg").load(name(table)).sort("c1", "c2", "c3").collectAsList()); + } + + private List deleteRecords(Table table) { + String[] additionalFields; + // do not select delete_file_path for comparison + // as delete files have been rewritten + if (table.spec().isUnpartitioned()) { + additionalFields = new String[] {"pos", "row"}; + } else { + additionalFields = new String[] {"pos", "row", "partition", "spec_id"}; + } + return rowsToJava( + spark + .read() + .format("iceberg") + .load(name(table) + ".position_deletes") + .select("file_path", additionalFields) + .sort("file_path", "pos") + .collectAsList()); + } + + private void writePosDeletesForFiles( + Table table, int deleteFilesPerPartition, int deletesPerDataFile, List files) + throws IOException { + writePosDeletesForFiles(table, deleteFilesPerPartition, deletesPerDataFile, files, false); + } + + private void writePosDeletesForFiles( + Table table, + int deleteFilesPerPartition, + int deletesPerDataFile, + List files, + boolean transactional) + throws IOException { + + Map> filesByPartition = + files.stream().collect(Collectors.groupingBy(ContentFile::partition)); + List deleteFiles = + Lists.newArrayListWithCapacity(deleteFilesPerPartition * filesByPartition.size()); + + for (Map.Entry> filesByPartitionEntry : + filesByPartition.entrySet()) { + + StructLike partition = filesByPartitionEntry.getKey(); + List partitionFiles = filesByPartitionEntry.getValue(); + + int deletesForPartition = partitionFiles.size() * deletesPerDataFile; + Assert.assertEquals( + "Number of delete files per partition should be " + + "evenly divisible by requested deletes per data file times number of data files in this partition", + 0, + deletesForPartition % deleteFilesPerPartition); + int deleteFileSize = deletesForPartition / deleteFilesPerPartition; + + int counter = 0; + List> deletes = Lists.newArrayList(); + for (DataFile partitionFile : partitionFiles) { + for (int deletePos = 0; deletePos < deletesPerDataFile; deletePos++) { + deletes.add(Pair.of(partitionFile.path(), (long) deletePos)); + counter++; + if (counter == deleteFileSize) { + // Dump to file and reset variables + OutputFile output = Files.localOutput(temp.newFile()); + deleteFiles.add(FileHelpers.writeDeleteFile(table, output, partition, deletes).first()); + counter = 0; + deletes.clear(); + } + } + } + } + + if (transactional) { + RowDelta rowDelta = table.newRowDelta(); + deleteFiles.forEach(rowDelta::addDeletes); + rowDelta.commit(); + } else { + deleteFiles.forEach( + deleteFile -> { + RowDelta rowDelta = table.newRowDelta(); + rowDelta.addDeletes(deleteFile); + rowDelta.commit(); + }); + } + } + + private List dataFiles(Table table) { + CloseableIterable tasks = table.newScan().includeColumnStats().planFiles(); + return Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + } + + private List deleteFiles(Table table) { + Table deletesTable = + MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); + CloseableIterable tasks = deletesTable.newBatchScan().planFiles(); + return Lists.newArrayList( + CloseableIterable.transform(tasks, t -> ((PositionDeletesScanTask) t).file())); + } + + private > List except(List first, List second) { + Set secondPaths = + second.stream().map(f -> f.path().toString()).collect(Collectors.toSet()); + return first.stream() + .filter(f -> !secondPaths.contains(f.path().toString())) + .collect(Collectors.toList()); + } + + private void assertNotContains(List original, List rewritten) { + Set originalPaths = + original.stream().map(f -> f.path().toString()).collect(Collectors.toSet()); + Set rewrittenPaths = + rewritten.stream().map(f -> f.path().toString()).collect(Collectors.toSet()); + rewrittenPaths.retainAll(originalPaths); + Assert.assertEquals(0, rewrittenPaths.size()); + } + + private void assertLocallySorted(List deleteFiles) { + for (DeleteFile deleteFile : deleteFiles) { + Dataset deletes = + spark.read().format("iceberg").load("default." + TABLE_NAME + ".position_deletes"); + deletes.filter(deletes.col("delete_file_path").equalTo(deleteFile.path().toString())); + List rows = deletes.collectAsList(); + Assert.assertTrue("Empty delete file found", rows.size() > 0); + int lastPos = 0; + String lastPath = ""; + for (Row row : rows) { + String path = row.getAs("file_path"); + long pos = row.getAs("pos"); + if (path.compareTo(lastPath) < 0) { + Assert.fail(String.format("File_path not sorted, Found %s after %s", path, lastPath)); + } else if (path.equals(lastPath)) { + Assert.assertTrue("Pos not sorted", pos >= lastPos); + } + } + } + } + + private String name(Table table) { + String[] splits = table.name().split("\\."); + Assert.assertEquals(3, splits.length); + return String.format("%s.%s", splits[1], splits[2]); + } + + private long size(List deleteFiles) { + return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum(); + } + + private void checkResult( + Result result, + List rewrittenDeletes, + List newDeletes, + int expectedGroups) { + Assert.assertEquals( + "Expected rewritten delete file count does not match", + rewrittenDeletes.size(), + result.rewrittenDeleteFilesCount()); + Assert.assertEquals( + "Expected new delete file count does not match", + newDeletes.size(), + result.addedDeleteFilesCount()); + Assert.assertEquals( + "Expected rewritten delete byte count does not match", + size(rewrittenDeletes), + result.rewrittenBytesCount()); + Assert.assertEquals( + "Expected new delete byte count does not match", + size(newDeletes), + result.addedBytesCount()); + + Assert.assertEquals( + "Expected rewrite group count does not match", + expectedGroups, + result.rewriteResults().size()); + Assert.assertEquals( + "Expected rewritten delete file count in all groups to match", + rewrittenDeletes.size(), + result.rewriteResults().stream() + .mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount) + .sum()); + Assert.assertEquals( + "Expected added delete file count in all groups to match", + newDeletes.size(), + result.rewriteResults().stream() + .mapToInt(FileGroupRewriteResult::addedDeleteFilesCount) + .sum()); + Assert.assertEquals( + "Expected rewritten delete bytes in all groups to match", + size(rewrittenDeletes), + result.rewriteResults().stream() + .mapToLong(FileGroupRewriteResult::rewrittenBytesCount) + .sum()); + Assert.assertEquals( + "Expected added delete bytes in all groups to match", + size(newDeletes), + result.rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum()); + } + + private void checkSequenceNumbers( + Table table, List rewrittenDeletes, List addedDeletes) { + StructLikeMap> rewrittenFilesPerPartition = + groupPerPartition(table, rewrittenDeletes); + StructLikeMap> addedFilesPerPartition = groupPerPartition(table, addedDeletes); + for (StructLike partition : rewrittenFilesPerPartition.keySet()) { + Long maxRewrittenSeq = + rewrittenFilesPerPartition.get(partition).stream() + .mapToLong(ContentFile::dataSequenceNumber) + .max() + .getAsLong(); + List addedPartitionFiles = addedFilesPerPartition.get(partition); + if (addedPartitionFiles != null) { + addedPartitionFiles.forEach( + d -> + Assert.assertEquals( + "Sequence number should be max of rewritten set", + d.dataSequenceNumber(), + maxRewrittenSeq)); + } + } + } + + private StructLikeMap> groupPerPartition( + Table table, List deleteFiles) { + StructLikeMap> result = + StructLikeMap.create(Partitioning.partitionType(table)); + for (DeleteFile deleteFile : deleteFiles) { + StructLike partition = deleteFile.partition(); + List partitionFiles = result.get(partition); + if (partitionFiles == null) { + partitionFiles = Lists.newArrayList(); + } + partitionFiles.add(deleteFile); + result.put(partition, partitionFiles); + } + return result; + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/FourColumnRecord.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/FourColumnRecord.java new file mode 100644 index 000000000000..0f9529e4d105 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/FourColumnRecord.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Objects; + +public class FourColumnRecord { + private Integer c1; + private String c2; + private String c3; + private String c4; + + public FourColumnRecord() {} + + public FourColumnRecord(Integer c1, String c2, String c3, String c4) { + this.c1 = c1; + this.c2 = c2; + this.c3 = c3; + this.c4 = c4; + } + + public Integer getC1() { + return c1; + } + + public void setC1(Integer c1) { + this.c1 = c1; + } + + public String getC2() { + return c2; + } + + public void setC2(String c2) { + this.c2 = c2; + } + + public String getC3() { + return c3; + } + + public void setC3(String c3) { + this.c3 = c3; + } + + public String getC4() { + return c4; + } + + public void setC4(String c4) { + this.c4 = c4; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FourColumnRecord that = (FourColumnRecord) o; + return Objects.equals(c1, that.c1) + && Objects.equals(c2, that.c2) + && Objects.equals(c3, that.c3) + && Objects.equals(c3, that.c4); + } + + @Override + public int hashCode() { + return Objects.hash(c1, c2, c3, c4); + } + + @Override + public String toString() { + return "ThreeColumnRecord{" + + "c1=" + + c1 + + ", c2='" + + c2 + + '\'' + + ", c3='" + + c3 + + '\'' + + ", c4='" + + c4 + + '\'' + + '}'; + } +}