diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index 49df7fc9ff1d..ae15f42804c4 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -42,4 +42,13 @@ public interface RewriteFiles extends SnapshotUpdate { * @return this for method chaining */ RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd); + + /** + * Add a rewrite that replaces one set of deletes with another that contains the same deleted rows. + * + * @param deletesToDelete files that will be replaced, cannot be null or empty. + * @param deletesToAdd files that will be added, cannot be null or empty. + * @return this for method chaining + */ + RewriteFiles rewriteDeletes(Set deletesToDelete, Set deletesToAdd); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index d2daac5f9b39..6e9166a72acb 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set filesToDelete, Set file return this; } + + @Override + public RewriteFiles rewriteDeletes(Set deletesToDelete, Set deletesToAdd) { + Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(), + "Files to delete cannot be null or empty"); + Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(), + "Files to add can not be null or empty"); + + for (DeleteFile toDelete : deletesToDelete) { + delete(toDelete); + } + + for (DeleteFile toAdd : deletesToAdd) { + add(toAdd); + } + + return this; + } } diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index f0788361aae8..4c4123d62982 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -24,6 +24,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; @@ -104,4 +105,13 @@ public EncryptedOutputFile newOutputFile(PartitionKey key) { OutputFile rawOutputFile = io.newOutputFile(newDataLocation); return encryptionManager.encrypt(rawOutputFile); } + + /** + * Generates EncryptedOutputFile for PartitionedWriter. + */ + public EncryptedOutputFile newOutputFile(StructLike partition) { + String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); + OutputFile rawOutputFile = io.newOutputFile(newDataLocation); + return encryptionManager.encrypt(rawOutputFile); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index c9f8c5a155f5..000465a30da3 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -28,7 +28,7 @@ import java.util.Set; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -37,7 +37,7 @@ import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; -class SortedPosDeleteWriter implements Closeable { +public class SortedPosDeleteWriter implements Closeable { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; private final Map>> posDeletes = Maps.newHashMap(); @@ -48,7 +48,7 @@ class SortedPosDeleteWriter implements Closeable { private final FileAppenderFactory appenderFactory; private final OutputFileFactory fileFactory; private final FileFormat format; - private final PartitionKey partition; + private final StructLike partition; private final long recordsNumThreshold; private int records = 0; @@ -56,7 +56,7 @@ class SortedPosDeleteWriter implements Closeable { SortedPosDeleteWriter(FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileFormat format, - PartitionKey partition, + StructLike partition, long recordsNumThreshold) { this.appenderFactory = appenderFactory; this.fileFactory = fileFactory; @@ -65,10 +65,10 @@ class SortedPosDeleteWriter implements Closeable { this.recordsNumThreshold = recordsNumThreshold; } - SortedPosDeleteWriter(FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileFormat format, - PartitionKey partition) { + public SortedPosDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileFormat format, + StructLike partition) { this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); } diff --git a/core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java b/core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java new file mode 100644 index 000000000000..60403160823c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.util.List; +import java.util.function.Predicate; + +public class ChainOrFilter extends Filter { + private final List> filters; + + public ChainOrFilter(List> filters) { + this.filters = filters; + } + + @Override + protected boolean shouldKeep(T item) { + for (Predicate filter : filters) { + if (filter.test(item)) { + return true; + } + } + + return false; + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 1ddfa270320c..d7fdde3b8ba2 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import org.apache.iceberg.Accessor; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -48,6 +49,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ChainOrFilter; +import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; import org.apache.parquet.Preconditions; @@ -110,7 +113,42 @@ public CloseableIterable filter(CloseableIterable records) { return applyEqDeletes(applyPosDeletes(records)); } - private CloseableIterable applyEqDeletes(CloseableIterable records) { + public CloseableIterable matchEqDeletes(CloseableIterable records) { + if (eqDeletes.isEmpty()) { + return records; + } + + Multimap, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); + for (DeleteFile delete : eqDeletes) { + filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete); + } + + List> deleteSetFilters = Lists.newArrayList(); + for (Map.Entry, Collection> entry : filesByDeleteIds.asMap().entrySet()) { + Set ids = entry.getKey(); + Iterable deletes = entry.getValue(); + + Schema deleteSchema = TypeUtil.select(requiredSchema, ids); + + // a projection to select and reorder fields of the file schema to match the delete rows + StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); + + Iterable> deleteRecords = Iterables.transform(deletes, + delete -> openDeletes(delete, deleteSchema)); + StructLikeSet deleteSet = Deletes.toEqualitySet( + // copy the delete records because they will be held in a set + CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy), + deleteSchema.asStruct()); + + Predicate predicate = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); + deleteSetFilters.add(predicate); + } + + Filter findDeleteRows = new ChainOrFilter<>(deleteSetFilters); + return findDeleteRows.filter(records); + } + + protected CloseableIterable applyEqDeletes(CloseableIterable records) { if (eqDeletes.isEmpty()) { return records; } diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java b/spark/src/main/java/org/apache/iceberg/actions/Actions.java index 8c39e698f260..9d8b54ec6304 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java +++ b/spark/src/main/java/org/apache/iceberg/actions/Actions.java @@ -82,6 +82,10 @@ public ExpireSnapshotsAction expireSnapshots() { return new ExpireSnapshotsAction(spark, table); } + public ReplaceDeleteAction replaceEqDeleteToPosDelete() { + return new ReplaceDeleteAction(spark, table); + } + /** * Converts the provided table into an Iceberg table in place. The table will no longer be accessible by it's * previous implementation diff --git a/spark/src/main/java/org/apache/iceberg/actions/DeleteRewriteActionResult.java b/spark/src/main/java/org/apache/iceberg/actions/DeleteRewriteActionResult.java new file mode 100644 index 000000000000..6219c20f09f8 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/actions/DeleteRewriteActionResult.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.List; +import org.apache.iceberg.DeleteFile; + +public class DeleteRewriteActionResult { + private List posDeletes; + private List eqDeletes; + + public DeleteRewriteActionResult(List eqDeletes, List posDeletes) { + this.eqDeletes = eqDeletes; + this.posDeletes = posDeletes; + } + + public List deletedFiles() { + return eqDeletes; + } + + public List addedFiles() { + return posDeletes; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java b/spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java new file mode 100644 index 000000000000..7a9ed5b3fd22 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.DeleteRewriter; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplaceDeleteAction extends + BaseSnapshotUpdateAction { + private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class); + private final Table table; + private final JavaSparkContext sparkContext; + private FileIO fileIO; + private final EncryptionManager encryptionManager; + private final boolean caseSensitive; + private final PartitionSpec spec; + private final long targetSizeInBytes; + private final int splitLookback; + private final long splitOpenFileCost; + + public ReplaceDeleteAction(SparkSession spark, Table table) { + this.table = table; + this.sparkContext = new JavaSparkContext(spark.sparkContext()); + this.fileIO = fileIO(); + this.encryptionManager = table.encryption(); + this.caseSensitive = false; + this.spec = table.spec(); + + long splitSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_SIZE, + TableProperties.SPLIT_SIZE_DEFAULT); + long targetFileSize = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + this.targetSizeInBytes = Math.min(splitSize, targetFileSize); + + this.splitLookback = PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.SPLIT_LOOKBACK, + TableProperties.SPLIT_LOOKBACK_DEFAULT); + this.splitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_OPEN_FILE_COST, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + } + + protected FileIO fileIO() { + if (this.fileIO == null) { + this.fileIO = SparkUtil.serializableFileIO(table()); + } + return this.fileIO; + } + + @Override + protected Table table() { + return table; + } + + @Override + public DeleteRewriteActionResult execute() { + CloseableIterable fileScanTasks = null; + try { + fileScanTasks = table.newScan() + .caseSensitive(caseSensitive) + .ignoreResiduals() + .planFiles(); + } finally { + try { + if (fileScanTasks != null) { + fileScanTasks.close(); + } + } catch (IOException ioe) { + LOG.warn("Failed to close task iterable", ioe); + } + } + + CloseableIterable tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan -> + scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + + Set eqDeletes = Sets.newHashSet(); + tasksWithEqDelete.forEach(task -> { + eqDeletes.addAll(task.deletes().stream() + .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES)) + .collect(Collectors.toList())); + }); + + Map> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator()); + + // Split and combine tasks under each partition + // TODO: can we split task? + List> combinedScanTasks = groupedTasks.entrySet().stream() + .map(entry -> { + CloseableIterable splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes); + return Pair.of(entry.getKey().get(), + TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost)); + }) + .flatMap(pair -> StreamSupport.stream(CloseableIterable + .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false) + ) + .collect(Collectors.toList()); + + if (!combinedScanTasks.isEmpty()) { + JavaRDD> taskRDD = sparkContext.parallelize(combinedScanTasks, + combinedScanTasks.size()); + Broadcast io = sparkContext.broadcast(fileIO()); + Broadcast encryption = sparkContext.broadcast(encryptionManager()); + + DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption); + List posDeletes = deleteRewriter.toPosDeletes(taskRDD); + + if (!eqDeletes.isEmpty() && !posDeletes.isEmpty()) { + rewriteDeletes(Lists.newArrayList(eqDeletes), posDeletes); + return new DeleteRewriteActionResult(Lists.newArrayList(eqDeletes), posDeletes); + } + } + + return new DeleteRewriteActionResult(Collections.emptyList(), Collections.emptyList()); + } + + protected EncryptionManager encryptionManager() { + return encryptionManager; + } + + private Map> groupTasksByPartition( + CloseableIterator tasksIter) { + ListMultimap tasksGroupedByPartition = Multimaps.newListMultimap( + Maps.newHashMap(), Lists::newArrayList); + try (CloseableIterator iterator = tasksIter) { + iterator.forEachRemaining(task -> { + StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition()); + tasksGroupedByPartition.put(structLike, task); + }); + } catch (IOException e) { + LOG.warn("Failed to close task iterator", e); + } + return tasksGroupedByPartition.asMap(); + } + + + private void rewriteDeletes(List eqDeletes, List posDeletes) { + Preconditions.checkArgument(eqDeletes.stream().allMatch(f -> f.content().equals(FileContent.EQUALITY_DELETES)), + "The deletes to be converted should be equality deletes"); + Preconditions.checkArgument(posDeletes.stream().allMatch(f -> f.content().equals(FileContent.POSITION_DELETES)), + "The converted deletes should be position deletes"); + try { + RewriteFiles rewriteFiles = table.newRewrite(); + rewriteFiles.rewriteDeletes(Sets.newHashSet(eqDeletes), Sets.newHashSet(posDeletes)); + commit(rewriteFiles); + } catch (Exception e) { + Tasks.foreach(Iterables.transform(posDeletes, f -> f.path().toString())) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) + .run(fileIO::deleteFile); + throw e; + } + } + + @Override + protected ReplaceDeleteAction self() { + return null; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java new file mode 100644 index 000000000000..e1d4f67bcdce --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.SortedPosDeleteWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +public class DeleteRewriter implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DeleteRewriter.class); + private final PartitionSpec spec; + private final Map properties; + private final Schema schema; + private final FileFormat format; + private final Broadcast io; + private final Broadcast encryptionManager; + private final LocationProvider locations; + private final String nameMapping; + private final boolean caseSensitive; + + public DeleteRewriter(Table table, boolean caseSensitive, + Broadcast io, Broadcast encryptionManager) { + this.spec = table.spec(); + this.schema = table.schema(); + this.locations = table.locationProvider(); + this.caseSensitive = caseSensitive; + this.io = io; + this.encryptionManager = encryptionManager; + this.properties = table.properties(); + this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING); + + String formatString = table.properties().getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } + + public List toPosDeletes(JavaRDD> taskRDD) { + JavaRDD> dataFilesRDD = taskRDD.map(this::toPosDeletes); + + return dataFilesRDD.collect().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + public List toPosDeletes(Pair task) throws Exception { + TaskContext context = TaskContext.get(); + int partitionId = context.partitionId(); + long taskId = context.taskAttemptId(); + + Schema metaSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION); + Schema expectedSchema = TypeUtil.join(metaSchema, schema); + + DeleteRowReader deleteRowReader = new DeleteRowReader(task.second(), schema, expectedSchema, nameMapping, + io.value(), encryptionManager.value(), caseSensitive); + + StructType structType = SparkSchemaUtil.convert(schema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec); + + OutputFileFactory fileFactory = new OutputFileFactory( + spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); + + SortedPosDeleteWriter posDeleteWriter = + new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, task.first()); + + try { + while (deleteRowReader.next()) { + InternalRow row = deleteRowReader.get(); + posDeleteWriter.delete(row.getString(0), row.getLong(1)); + } + + deleteRowReader.close(); + deleteRowReader = null; + + return Lists.newArrayList(posDeleteWriter.complete()); + + } catch (Throwable originalThrowable) { + try { + LOG.error("Aborting task", originalThrowable); + context.markTaskFailed(originalThrowable); + + LOG.error("Aborting commit for partition {} (task {}, attempt {}, stage {}.{})", + partitionId, taskId, context.attemptNumber(), context.stageId(), context.stageAttemptNumber()); + if (deleteRowReader != null) { + deleteRowReader.close(); + } + + // clean up files created by this writer + Tasks.foreach(Iterables.concat(posDeleteWriter.complete())) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.value().deleteFile(file.path().toString())); + + LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage {}.{})", + partitionId, taskId, context.taskAttemptId(), context.stageId(), context.stageAttemptNumber()); + + } catch (Throwable inner) { + if (originalThrowable != inner) { + originalThrowable.addSuppressed(inner); + LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner); + } + } + + if (originalThrowable instanceof Exception) { + throw originalThrowable; + } else { + throw new RuntimeException(originalThrowable); + } + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java new file mode 100644 index 000000000000..d8dc5526677d --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; + +public class DeleteRowReader extends RowDataReader { + private final Schema tableSchema; + private final Schema expectedSchema; + + public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping, + FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) { + super(task, schema, schema, nameMapping, io, encryptionManager, + caseSensitive); + this.tableSchema = schema; + this.expectedSchema = expectedSchema; + } + + @Override + CloseableIterator open(FileScanTask task) { + SparkDeleteMatcher matches = new SparkDeleteMatcher(task, tableSchema, expectedSchema); + + // schema or rows returned by readers + Schema requiredSchema = matches.requiredSchema(); + Map idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant); + DataFile file = task.file(); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); + + return matches.matchEqDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } + + protected class SparkDeleteMatcher extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteMatcher(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task, tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InputFile getInputFile(String location) { + return DeleteRowReader.this.getInputFile(location); + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index b021c402cb74..c8291ea44774 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -95,7 +95,7 @@ CloseableIterator open(FileScanTask task) { return deletes.filter(open(task, requiredSchema, idToConstant)).iterator(); } - private CloseableIterable open(FileScanTask task, Schema readSchema, Map idToConstant) { + protected CloseableIterable open(FileScanTask task, Schema readSchema, Map idToConstant) { CloseableIterable iter; if (task.isDataTask()) { iter = newDataIterable(task.asDataTask(), readSchema); @@ -215,7 +215,7 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); } - private class SparkDeleteFilter extends DeleteFilter { + protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction.java new file mode 100644 index 000000000000..190d004ced57 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public abstract class TestRewriteDeletesAction extends SparkTestBase { + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private String tableLocation = null; + + @Before + public void setupTableLocation() throws Exception { + File tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + @Test + public void testRewriteDeletesWithNoEqDeletes() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + table.refresh(); + + DeleteRewriteActionResult result = Actions.forTable(table).replaceEqDeleteToPosDelete().execute(); + Assert.assertTrue("Shouldn't contain equality deletes", result.deletedFiles().isEmpty()); + Assert.assertTrue("Shouldn't generate position deletes", result.addedFiles().isEmpty()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesUnpartitionedTable() throws IOException { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c3").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c3"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c3", "AAAA")); + + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + createEncryptedOutputFile(createPartitionKey(table, record), fileFactory), + FileFormat.PARQUET, + createPartitionKey(table, record)); + + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(record); + } + + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + Actions.forTable(table).replaceEqDeleteToPosDelete().execute(); + + CloseableIterable tasks = CloseableIterable.filter( + table.newScan().planFiles(), task -> task.deletes().stream() + .anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + Assert.assertFalse("Should not contain any equality deletes", tasks.iterator().hasNext()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(records1.get(1)); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesInPartitionedTable() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .truncate("c3", 2) + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAA", "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + table.refresh(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c2").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c2"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + PartitionKey key = createPartitionKey(table, + GenericRecord.create(table.schema()).copy(ImmutableMap.of("c1", 1, "c2", "ignore", "c3", "AAAA"))); + + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + createEncryptedOutputFile(key, fileFactory), FileFormat.PARQUET, key); + + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c2", "AAAAAAAA"))); + } + + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + Actions.forTable(table).replaceEqDeleteToPosDelete().execute(); + + CloseableIterable tasks = CloseableIterable.filter( + table.newScan().planFiles(), task -> task.deletes().stream() + .anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + Assert.assertFalse("Should not contain any equality deletes", tasks.iterator().hasNext()); + + table.refresh(); + CloseableIterable newTasks = table.newScan().ignoreResiduals().planFiles(); + List deleteFiles = Lists.newArrayList(); + newTasks.forEach(task -> { + deleteFiles.addAll(task.deletes()); + }); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(records1.get(1)); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesWithDeletes() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + List records3 = Lists.newArrayList( + new ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE") + ); + writeRecords(records3); + table.refresh(); + + DataFile fileForPosDelete = table.currentSnapshot().addedFiles().iterator().next(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c3").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c3"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + // write equality delete + Record partitionRecord = GenericRecord.create(table.schema().select("c1")).copy(ImmutableMap.of("c1", 1)); + EncryptedOutputFile file = createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c3", "AAAA")); + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(record); + } + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + // write positional delete + partitionRecord = partitionRecord.copy(ImmutableMap.of("c1", 3)); + file = createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + PositionDeleteWriter posDeleteWriter = appenderFactory.newPosDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + posDeleteWriter.delete(fileForPosDelete.path(), 0); + posDeleteWriter.close(); + table.newRowDelta().addDeletes(posDeleteWriter.toDeleteFile()).commit(); + + DeleteRewriteActionResult result = Actions.forTable(table).replaceEqDeleteToPosDelete().execute(); + Assert.assertEquals("Should contain one equality delete to delete", 1, result.deletedFiles().size()); + Assert.assertEquals("Should contain one position delete to add", 1, result.addedFiles().size()); + + CloseableIterable tasks = CloseableIterable.filter( + table.newScan().planFiles(), task -> task.deletes().stream() + .anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + Assert.assertFalse("Should not contain any equality deletes", tasks.iterator().hasNext()); + + table.refresh(); + CloseableIterable newTasks = table.newScan().ignoreResiduals().planFiles(); + List deleteFiles = Lists.newArrayList(); + newTasks.forEach(task -> { + deleteFiles.addAll(task.deletes()); + }); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(records1.get(1)); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + private void writeRecords(List records) { + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + writeDF(df); + } + + private void writeDF(Dataset df) { + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + } + + private PartitionKey createPartitionKey(Table table, Record record) { + if (table.spec().isUnpartitioned()) { + return null; + } + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + private EncryptedOutputFile createEncryptedOutputFile(PartitionKey partition, OutputFileFactory fileFactory) { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(partition); + } + } + +} diff --git a/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesActions24.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesActions24.java new file mode 100644 index 000000000000..76a388c38c05 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesActions24.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class TestRewriteDeletesActions24 extends TestRewriteDeletesAction { +} diff --git a/spark3/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction3.java b/spark3/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction3.java new file mode 100644 index 000000000000..8881f55373aa --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction3.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +public class TestRewriteDeletesAction3 extends TestRewriteDeletesAction { +}