diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java index f2564ddb703b..677e69d8bcec 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java +++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java @@ -74,4 +74,11 @@ default ExpireSnapshots expireSnapshots(Table table) { default DeleteReachableFiles deleteReachableFiles(String metadataLocation) { throw new UnsupportedOperationException(this.getClass().getName() + " does not implement deleteReachableFiles"); } + + /** + * Instantiates an action to rewrite deletes. + */ + default RewriteDeletes rewriteDeletes(Table table) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement rewriteDeletes"); + } } diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDeletes.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDeletes.java new file mode 100644 index 000000000000..0e48cf5891ec --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDeletes.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.Set; +import org.apache.iceberg.DeleteFile; + +public interface RewriteDeletes extends SnapshotUpdate { + + /** + * Set the implementation class name of rewrite delete strategy + * + * @return this for method chaining + */ + RewriteDeletes strategy(String strategyImpl); + + /** + * The action result that contains a summary of the execution. + */ + interface Result { + /** + * Returns the delete files to rewrite. + */ + Set deletedFiles(); + + /** + * Returns the added delete files. + */ + Set addedFiles(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDeletesResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDeletesResult.java new file mode 100644 index 000000000000..af42c08071c0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDeletesResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.Set; +import org.apache.iceberg.DeleteFile; + +public class BaseRewriteDeletesResult implements RewriteDeletes.Result { + + private final Set deletesToReplace; + private final Set deletesToAdd; + + public BaseRewriteDeletesResult(Set deletesToReplace, Set deletesToAdd) { + this.deletesToReplace = deletesToReplace; + this.deletesToAdd = deletesToAdd; + } + + @Override + public Set deletedFiles() { + return deletesToReplace; + } + + @Override + public Set addedFiles() { + return deletesToAdd; + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDeleteStrategy.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDeleteStrategy.java new file mode 100644 index 000000000000..d9f6365ba32e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDeleteStrategy.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; + +public interface RewriteDeleteStrategy { + + /** + * Returns the name of this rewrite deletes strategy + */ + String name(); + + /** + * Returns the table being modified by this rewrite strategy + */ + Table table(); + + /** + * Select the deletes to rewrite. + * + * @param dataFiles iterable of FileScanTasks for data files in a given partition + * @return iterable of original delete file to be replaced. + */ + Iterable selectDeletesToRewrite(Iterable dataFiles); + + /** + * Define how to rewrite the deletes. + * + * @param deleteFilesToRewrite a group of files to be rewritten together + * @return iterable of delete files used to replace the original delete files. + */ + Set rewriteDeletes(Set deleteFilesToRewrite); + + /** + * Groups file scans into lists which will be processed in a single executable unit. Each group will end up being + * committed as an independent set of changes. This creates the jobs which will eventually be run as by the underlying + * Action. + * + * @param deleteFiles iterable of DeleteFile to be rewritten + * @return iterable of lists of FileScanTasks which will be processed together + */ + Iterable> planDeleteGroups(Iterable deleteFiles); + + /** + * Sets options to be used with this strategy + */ + RewriteDeleteStrategy options(Map options); + + /** + * Returns a set of options which this rewrite strategy can use. This is an allowed-list and any options not + * specified here will be rejected at runtime. + */ + Set validOptions(); +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 62154f7d6071..23f9b4b845b1 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.iceberg.Accessor; import org.apache.iceberg.MetadataColumns; @@ -113,6 +114,13 @@ public static CloseableIterable streamingFilter(CloseableIterable rows return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); } + public static CloseableIterable streamingDeletedRowMarker(CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + Consumer deleteMarker) { + return new PositionStreamDeletedRowMarker<>(rows, rowToPosition, posDeletes, deleteMarker); + } + public static CloseableIterable deletePositions(CharSequence dataLocation, CloseableIterable deleteFile) { return deletePositions(dataLocation, ImmutableList.of(deleteFile)); @@ -176,7 +184,7 @@ public CloseableIterator iterator() { CloseableIterator iter; if (deletePosIterator.hasNext()) { - iter = new PositionFilterIterator(rows.iterator(), deletePosIterator); + iter = positionIterator(rows.iterator(), deletePosIterator); } else { iter = rows.iterator(); try { @@ -191,7 +199,12 @@ public CloseableIterator iterator() { return iter; } - private class PositionFilterIterator extends FilterIterator { + protected FilterIterator positionIterator(CloseableIterator items, + CloseableIterator newDeletePositions) { + return new PositionFilterIterator(items, newDeletePositions); + } + + protected class PositionFilterIterator extends FilterIterator { private final CloseableIterator deletePosIterator; private long nextDeletePos; @@ -233,6 +246,37 @@ public void close() { } } + static class PositionStreamDeletedRowMarker extends PositionStreamDeleteFilter { + private final Consumer deleteMarker; + + private PositionStreamDeletedRowMarker(CloseableIterable rows, Function extractPos, + CloseableIterable deletePositions, + Consumer deleteMarker) { + super(rows, extractPos, deletePositions); + this.deleteMarker = deleteMarker; + } + + @Override + protected FilterIterator positionIterator(CloseableIterator items, + CloseableIterator deletePositions) { + return new PositionMarkerIterator(items, deletePositions); + } + + private class PositionMarkerIterator extends PositionFilterIterator { + private PositionMarkerIterator(CloseableIterator items, CloseableIterator deletePositions) { + super(items, deletePositions); + } + + @Override + protected boolean shouldKeep(T row) { + if (!super.shouldKeep(row)) { + deleteMarker.accept(row); + } + return true; + } + } + } + private static class DataFileFilter extends Filter { private static final Comparator CHARSEQ_COMPARATOR = Comparators.charSequences(); private final CharSequence dataLocation; diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 0da20579a62a..90d15060c913 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -36,7 +36,7 @@ import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; -class SortedPosDeleteWriter implements Closeable { +public class SortedPosDeleteWriter implements Closeable { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; private final Map>> posDeletes = Maps.newHashMap(); @@ -52,11 +52,11 @@ class SortedPosDeleteWriter implements Closeable { private int records = 0; - SortedPosDeleteWriter(FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileFormat format, - StructLike partition, - long recordsNumThreshold) { + public SortedPosDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileFormat format, + StructLike partition, + long recordsNumThreshold) { this.appenderFactory = appenderFactory; this.fileFactory = fileFactory; this.format = format; @@ -64,10 +64,10 @@ class SortedPosDeleteWriter implements Closeable { this.recordsNumThreshold = recordsNumThreshold; } - SortedPosDeleteWriter(FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileFormat format, - StructLike partition) { + public SortedPosDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileFormat format, + StructLike partition) { this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); } diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index 4a1ad5af5df1..73c6532db8dc 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -19,16 +19,29 @@ package org.apache.iceberg.util; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; import java.util.function.Function; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TableScanUtil { + private static final Logger LOG = LoggerFactory.getLogger(TableScanUtil.class); + private TableScanUtil() { } @@ -64,4 +77,21 @@ public static CloseableIterable planTasks(CloseableIterable> groupTasksByPartition( + PartitionSpec spec, + CloseableIterator tasksIter) { + ListMultimap tasksGroupedByPartition = Multimaps.newListMultimap( + Maps.newHashMap(), Lists::newArrayList); + try (CloseableIterator iterator = tasksIter) { + iterator.forEachRemaining(task -> { + StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition()); + tasksGroupedByPartition.put(structLike, task); + }); + } catch (IOException e) { + LOG.warn("Failed to close task iterator", e); + } + + return tasksGroupedByPartition.asMap(); + } } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a8eb13cdfa68..8e7bc9ec1b91 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import org.apache.iceberg.Accessor; import org.apache.iceberg.DataFile; @@ -66,6 +67,7 @@ public abstract class DeleteFilter { private final List eqDeletes; private final Schema requiredSchema; private final Accessor posAccessor; + private Integer deleteMarkerIndex = null; protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; @@ -96,6 +98,29 @@ public Schema requiredSchema() { return requiredSchema; } + protected int deleteMarkerIndex() { + if (deleteMarkerIndex != null) { + return deleteMarkerIndex; + } + + int index = 0; + for (Types.NestedField field : requiredSchema().columns()) { + if (field.fieldId() != MetadataColumns.IS_DELETED.fieldId()) { + index = index + 1; + } else { + break; + } + } + + deleteMarkerIndex = index; + + return deleteMarkerIndex; + } + + protected abstract Consumer deleteMarker(); + + protected abstract boolean isDeletedRow(T row); + Accessor posAccessor() { return posAccessor; } @@ -112,11 +137,20 @@ public CloseableIterable filter(CloseableIterable records) { return applyEqDeletes(applyPosDeletes(records)); } - private List> applyEqDeletes() { - List> isInDeleteSets = Lists.newArrayList(); + private Filter deletedRowsSelector() { + return new Filter() { + @Override + protected boolean shouldKeep(T item) { + return isDeletedRow(item); + } + }; + } + + private Predicate buildEqDeletePredicate() { if (eqDeletes.isEmpty()) { - return isInDeleteSets; + return null; } + Predicate isDeleted = null; Multimap, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); for (DeleteFile delete : eqDeletes) { @@ -139,43 +173,122 @@ private List> applyEqDeletes() { CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy), deleteSchema.asStruct()); - Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); - isInDeleteSets.add(isInDeleteSet); + isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) : + isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record)))); } - return isInDeleteSets; + return isDeleted; } - public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { + private Predicate buildPosDeletePredicate() { + if (posDeletes.isEmpty()) { + return null; + } + + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + Set deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)); + if (deleteSet.isEmpty()) { + return null; + } + + return record -> deleteSet.contains(pos(record)); + } + + public CloseableIterable keepRowsFromDeletes(CloseableIterable records) { + Predicate isDeletedFromPosDeletes = buildPosDeletePredicate(); + if (isDeletedFromPosDeletes == null) { + return keepRowsFromEqualityDeletes(records); + } + + Predicate isDeletedFromEqDeletes = buildEqDeletePredicate(); + if (isDeletedFromEqDeletes == null) { + return keepRowsFromPosDeletes(records); + } + + CloseableIterable markedRecords; + + if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { + markedRecords = CloseableIterable.transform(records, record -> { + if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) { + deleteMarker().accept(record); + } + return record; + }); + + } else { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos, + Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> { + if (!isDeletedRow(record) && isDeletedFromEqDeletes.test(record)) { + deleteMarker().accept(record); + } + return record; + }); + } + return deletedRowsSelector().filter(markedRecords); + } + + private CloseableIterable selectRowsFromDeletes(CloseableIterable records, Predicate isDeleted) { + CloseableIterable markedRecords = CloseableIterable.transform(records, record -> { + if (isDeleted.test(record)) { + deleteMarker().accept(record); + } + return record; + }); + + return deletedRowsSelector().filter(markedRecords); + } + + public CloseableIterable keepRowsFromEqualityDeletes(CloseableIterable records) { // Predicate to test whether a row has been deleted by equality deletions. - Predicate deletedRows = applyEqDeletes().stream() - .reduce(Predicate::or) - .orElse(t -> false); + Predicate isDeleted = buildEqDeletePredicate(); + if (isDeleted == null) { + return CloseableIterable.empty(); + } - Filter deletedRowsFilter = new Filter() { - @Override - protected boolean shouldKeep(T item) { - return deletedRows.test(item); + return selectRowsFromDeletes(records, isDeleted); + } + + public CloseableIterable keepRowsFromPosDeletes(CloseableIterable records) { + // if there are fewer deletes than a reasonable number to keep in memory, use a set + if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { + // Predicate to test whether a row has been deleted by equality deletions. + Predicate isDeleted = buildPosDeletePredicate(); + if (isDeleted == null) { + return CloseableIterable.empty(); } - }; - return deletedRowsFilter.filter(records); + return selectRowsFromDeletes(records, isDeleted); + } else { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + CloseableIterable markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos, + Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()); + + return deletedRowsSelector().filter(markedRecords); + } } private CloseableIterable applyEqDeletes(CloseableIterable records) { // Predicate to test whether a row should be visible to user after applying equality deletions. - Predicate remainingRows = applyEqDeletes().stream() - .map(Predicate::negate) - .reduce(Predicate::and) - .orElse(t -> true); + Predicate isDeleted = buildEqDeletePredicate(); + if (isDeleted == null) { + return records; + } + + CloseableIterable markedRecords = CloseableIterable.transform(records, record -> { + if (isDeleted.test(record)) { + deleteMarker().accept(record); + } + return record; + }); Filter remainingRowsFilter = new Filter() { @Override protected boolean shouldKeep(T item) { - return remainingRows.test(item); + return !isDeletedRow(item); } }; - return remainingRowsFilter.filter(records); + return remainingRowsFilter.filter(markedRecords); } private CloseableIterable applyPosDeletes(CloseableIterable records) { @@ -192,7 +305,17 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes))); } - return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes)); + CloseableIterable markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos, + Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()); + + Filter remainingRowsFilter = new Filter() { + @Override + protected boolean shouldKeep(T item) { + return !isDeletedRow(item); + } + }; + + return remainingRowsFilter.filter(markedRecords); } private CloseableIterable openPosDeletes(DeleteFile file) { diff --git a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java index 30109f17f270..bcebc246c6f4 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.data; +import java.util.function.Consumer; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; @@ -40,6 +41,16 @@ protected long pos(Record record) { return (Long) posAccessor().get(record); } + @Override + protected Consumer deleteMarker() { + return record -> record.set(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(Record record) { + return record.get(deleteMarkerIndex(), Boolean.class); + } + @Override protected StructLike asStructLike(Record record) { return asStructLike.wrap(record); diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 70ac77473c5d..2bd53d65a707 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -62,7 +62,7 @@ public abstract class DeleteReadTests { private String tableName = null; protected Table table = null; private List records = null; - private DataFile dataFile = null; + protected DataFile dataFile = null; @Before public void writeTestDataFile() throws IOException { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java index 5a568144d1f7..ce6cab1c98e7 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -20,7 +20,10 @@ package org.apache.iceberg.flink.source; import java.util.Map; +import java.util.function.Consumer; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.UpdatableRowData; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; @@ -161,6 +164,24 @@ private class FlinkDeleteFilter extends DeleteFilter { this.asStructLike = new RowDataWrapper(FlinkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct()); } + @Override + protected Consumer deleteMarker() { + return record -> { + if (record instanceof GenericRowData) { + ((GenericRowData) record).setField(deleteMarkerIndex(), true); + } else if (record instanceof UpdatableRowData) { + ((UpdatableRowData) record).setField(deleteMarkerIndex(), true); + } else { + throw new UnsupportedOperationException("Can not mark row data"); + } + }; + } + + @Override + protected boolean isDeletedRow(RowData row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(RowData row) { return asStructLike.wrap(row); diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java new file mode 100644 index 000000000000..1443c8b52588 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Set; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDeletesResult; +import org.apache.iceberg.actions.RewriteDeleteStrategy; +import org.apache.iceberg.actions.RewriteDeletes; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseRewriteDeletesSparkAction + extends BaseSnapshotUpdateSparkAction + implements RewriteDeletes { + + private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDeletesSparkAction.class); + + private final Table table; + private final FileIO fileIO; + private final SparkSession spark; + + private RewriteDeleteStrategy strategy; + + protected BaseRewriteDeletesSparkAction(SparkSession spark, Table table) { + super(spark); + this.spark = spark; + this.table = table; + this.fileIO = table.io(); + } + + @Override + public RewriteDeletes strategy(String strategyImpl) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(RewriteDeleteStrategy.class) + .impl(strategyImpl, SparkSession.class, Table.class) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize RewriteDeleteStrategy implementation %s: %s", strategyImpl, e.getMessage()), e); + } + + try { + strategy = ctor.newInstance(spark, table); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize RewriteDeleteStrategy, %s does not implement RewriteDeleteStrategy", + strategyImpl), e); + } + + return this; + } + + @Override + public Result execute() { + if (strategy == null) { + // use ConvertEqDeletesStrategy for default right now. + strategy = new ConvertEqDeletesStrategy(spark, table); + } + CloseableIterable fileScanTasks = table.newScan() + .ignoreResiduals() + .planFiles(); + + Set deletesToAdd = Sets.newHashSet(); + + try { + Iterable deletesToReplace = strategy.selectDeletesToRewrite(fileScanTasks); + Iterable> groupedDeletes = strategy.planDeleteGroups(deletesToReplace); + for (Set deletesInGroup : groupedDeletes) { + deletesToAdd.addAll(strategy.rewriteDeletes(deletesInGroup)); + } + + table.newRewrite() + .rewriteFiles(ImmutableSet.of(), Sets.newHashSet(deletesToReplace), + ImmutableSet.of(), Sets.newHashSet(deletesToAdd)) + .commit(); + + return new BaseRewriteDeletesResult(Sets.newHashSet(deletesToReplace), Sets.newHashSet(deletesToAdd)); + } catch (Exception e) { + Tasks.foreach(Iterables.transform(deletesToAdd, f -> f.path().toString())) + .noRetry() + .suppressFailureWhenFinished() + .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc)) + .run(fileIO::deleteFile); + throw e; + } finally { + try { + fileScanTasks.close(); + } catch (IOException io) { + LOG.error("Cannot properly close file iterable while planning for rewrite", io); + } + } + } + + @Override + protected RewriteDeletes self() { + return this; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkActions.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkActions.java index 58b57177cf73..62311c4f0334 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkActions.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkActions.java @@ -24,6 +24,7 @@ import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.actions.DeleteReachableFiles; import org.apache.iceberg.actions.ExpireSnapshots; +import org.apache.iceberg.actions.RewriteDeletes; import org.apache.iceberg.actions.RewriteManifests; import org.apache.spark.sql.SparkSession; @@ -58,4 +59,9 @@ public ExpireSnapshots expireSnapshots(Table table) { public DeleteReachableFiles deleteReachableFiles(String metadataLocation) { return new BaseDeleteReachableFilesSparkAction(spark, metadataLocation); } + + @Override + public RewriteDeletes rewriteDeletes(Table table) { + return new BaseRewriteDeletesSparkAction(spark, table); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java b/spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java new file mode 100644 index 000000000000..50b98ae93a6e --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RewriteDeleteStrategy; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.source.EqualityDeleteRewriter; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ConvertEqDeletesStrategy.class); + + private final Table table; + private long deleteTargetSizeInBytes; + private int splitLookback; + private long splitOpenFileCost; + + private Map deleteToFileMap; + private final JavaSparkContext sparkContext; + + /** + * Defines whether to split out the result position deletes by data file names. + * + * This should be used in EqualityDeleteRewriter. + */ + public static final String SPLIT_POSITION_DELETE = "split-position-delete.enabled"; + public static final String PARTIAL_COMMIT_ENABLED = "partial-commit.enabled"; + + public ConvertEqDeletesStrategy(SparkSession spark, Table table) { + this.table = table; + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.deleteTargetSizeInBytes = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.DELETE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + this.splitLookback = PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.SPLIT_LOOKBACK, + TableProperties.SPLIT_LOOKBACK_DEFAULT); + this.splitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.SPLIT_OPEN_FILE_COST, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + } + + @Override + public String name() { + return "CONVERT-EQUALITY-DELETES"; + } + + @Override + public Table table() { + return table; + } + + @Override + public Iterable selectDeletesToRewrite(Iterable dataFiles) { + deleteToFileMap = Maps.newHashMap(); + + StreamSupport.stream(dataFiles.spliterator(), false).forEach(scan -> { + scan.deletes().stream() + .filter(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + .forEach(delete -> deleteToFileMap.put(delete, scan)); + }); + + return deleteToFileMap.keySet(); + } + + @Override + public Set rewriteDeletes(Set deleteFilesToRewrite) { + List refDataFiles = deleteFilesToRewrite.stream() + .map(deleteFile -> deleteToFileMap.get(deleteFile)) + .distinct() + .collect(Collectors.toList()); + + if (refDataFiles.isEmpty()) { + return Collections.emptySet(); + } + + Map> filesByPartition = Streams.stream(refDataFiles.listIterator()) + .collect(Collectors.groupingBy(task -> task.file().partition())); + + // Split and combine tasks under each partition + List> combinedScanTasks = filesByPartition.entrySet().stream() + .map(entry -> { + CloseableIterable splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(entry.getValue()), deleteTargetSizeInBytes); + return Pair.of(entry.getKey(), + TableScanUtil.planTasks(splitTasks, deleteTargetSizeInBytes, splitLookback, splitOpenFileCost)); + }).flatMap(pair -> StreamSupport.stream(CloseableIterable + .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false) + ).collect(Collectors.toList()); + + JavaRDD> taskRDD = + sparkContext.parallelize(combinedScanTasks, refDataFiles.size()); + Broadcast io = sparkContext.broadcast(table.io()); + Broadcast encryption = sparkContext.broadcast(table.encryption()); + + EqualityDeleteRewriter deleteRewriter = new EqualityDeleteRewriter(table, true, io, encryption); + + return deleteRewriter.toPosDeletes(taskRDD); + } + + @Override + public Iterable> planDeleteGroups(Iterable deleteFiles) { + List> deletesGroups = Lists.newArrayList(); + deletesGroups.add(Sets.newHashSet(deleteFiles)); + + return deletesGroups; + } + + public RewriteDeleteStrategy options(Map options) { + // TODO: parse the options + return this; + } + + public Set validOptions() { + return ImmutableSet.of( + SPLIT_POSITION_DELETE, + PARTIAL_COMMIT_ENABLED + ); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java similarity index 69% rename from spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java rename to spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java index d4328addc759..172a6cf0bc2d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -30,12 +31,15 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -public class EqualityDeleteRowReader extends RowDataReader { +public class DeleteRowReader extends RowDataReader { private final Schema expectedSchema; + private final FileContent deleteSelector; - public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { - super(task, table, table.schema(), caseSensitive); + public DeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, + boolean caseSensitive, FileContent deleteContent) { + super(task, table, expectedSchema, caseSensitive); this.expectedSchema = expectedSchema; + this.deleteSelector = deleteContent; } @Override @@ -50,6 +54,12 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + if (deleteSelector == null) { + return matches.keepRowsFromDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } else if (deleteSelector.equals(FileContent.EQUALITY_DELETES)) { + return matches.keepRowsFromEqualityDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } else { + return matches.keepRowsFromPosDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRewriter.java new file mode 100644 index 000000000000..6bd3f0dd2c1a --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRewriter.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.SortedPosDeleteWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EqualityDeleteRewriter implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(EqualityDeleteRewriter.class); + private final PartitionSpec spec; + private final Schema schema; + private final Broadcast io; + private final Broadcast encryptionManager; + private final LocationProvider locations; + private final boolean caseSensitive; + private final FileFormat format; + private final Table table; + + + public EqualityDeleteRewriter(Table table, boolean caseSensitive, + Broadcast io, Broadcast encryptionManager) { + this.table = table; + this.spec = table.spec(); + this.schema = table.schema(); + this.io = io; + this.encryptionManager = encryptionManager; + this.locations = table.locationProvider(); + this.caseSensitive = caseSensitive; + String formatString = table.properties().getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } + + public Set toPosDeletes(JavaRDD> taskRDD) { + JavaRDD> dataFilesRDD = taskRDD.map(this::toPosDeletes); + + return dataFilesRDD.collect().stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + public Set toPosDeletes(Pair task) throws Exception { + TaskContext context = TaskContext.get(); + int partitionId = context.partitionId(); + long taskId = context.taskAttemptId(); + + Schema metaSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION); + Schema expectedSchema = TypeUtil.join(metaSchema, schema); + + DeleteRowReader deleteRowReader = new DeleteRowReader(task.second(), table, expectedSchema, caseSensitive, + FileContent.EQUALITY_DELETES); + + StructType structType = SparkSchemaUtil.convert(schema); + SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, schema, structType) + .spec(spec) + .build(); + + OutputFileFactory fileFactory = new OutputFileFactory( + spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); + + SortedPosDeleteWriter posDeleteWriter = + new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, task.first()); + + try { + while (deleteRowReader.next()) { + InternalRow row = deleteRowReader.get(); + posDeleteWriter.delete(row.getString(0), row.getLong(1)); + } + + deleteRowReader.close(); + deleteRowReader = null; + + return Sets.newHashSet(posDeleteWriter.complete()); + + } catch (Throwable originalThrowable) { + try { + LOG.error("Aborting task", originalThrowable); + context.markTaskFailed(originalThrowable); + + LOG.error("Aborting commit for partition {} (task {}, attempt {}, stage {}.{})", + partitionId, taskId, context.attemptNumber(), context.stageId(), context.stageAttemptNumber()); + if (deleteRowReader != null) { + deleteRowReader.close(); + } + + // clean up files created by this writer + Tasks.foreach(Iterables.concat(posDeleteWriter.complete())) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.value().deleteFile(file.path().toString())); + + LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage {}.{})", + partitionId, taskId, context.taskAttemptId(), context.stageId(), context.stageAttemptNumber()); + + } catch (Throwable inner) { + if (originalThrowable != inner) { + originalThrowable.addSuppressed(inner); + LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner); + } + } + + if (originalThrowable instanceof Exception) { + throw originalThrowable; + } else { + throw new RuntimeException(originalThrowable); + } + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 6d4bf8ec3933..f17d8e1d09b8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -223,6 +224,16 @@ protected class SparkDeleteFilter extends DeleteFilter { this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } + @Override + protected Consumer deleteMarker() { + return record -> record.setBoolean(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(InternalRow row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction.java new file mode 100644 index 000000000000..f48ec6c6774f --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.actions.ConvertEqDeletesStrategy; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public abstract class TestRewriteDeletesAction extends SparkTestBase { + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private String tableLocation = null; + + @Before + public void setupTableLocation() throws Exception { + File tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + public abstract ActionsProvider actionsProvider(); + + @Test + public void testRewriteDeletesWithNoEqDeletes() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + table.refresh(); + + AssertHelpers.assertThrows("should fail execute", + IllegalArgumentException.class, "Files to delete cannot be null or empty", + () -> actionsProvider().rewriteDeletes(table).execute() + ); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesWithEqDeletesNoMatch() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + table.refresh(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c3").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c3"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + Record partitionRecord = GenericRecord.create(table.schema().select("c1")) + .copy(ImmutableMap.of("c1", 1)); + + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c3", "AA")); + + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory), + FileFormat.PARQUET, + createPartitionKey(table, partitionRecord)); + + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(record); + } + + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + AssertHelpers.assertThrows("should fail execute", + IllegalArgumentException.class, "Files to delete cannot be null or empty", + () -> actionsProvider().rewriteDeletes(table).strategy(ConvertEqDeletesStrategy.class.getName()).execute() + ); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesUnpartitionedTable() throws IOException { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c3").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c3"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c3", "AAAA")); + + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + createEncryptedOutputFile(createPartitionKey(table, record), fileFactory), + FileFormat.PARQUET, + createPartitionKey(table, record)); + + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(record); + } + + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + actionsProvider().rewriteDeletes(table).strategy(ConvertEqDeletesStrategy.class.getName()).execute(); + + CloseableIterable tasks = CloseableIterable.filter( + table.newScan().planFiles(), task -> task.deletes().stream() + .anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + Assert.assertFalse("Should not contain any equality deletes", tasks.iterator().hasNext()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(records1.get(1)); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesInPartitionedTable() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + table.refresh(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c3").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c3"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + Record partitionRecord = GenericRecord.create(table.schema().select("c1")) + .copy(ImmutableMap.of("c1", 1)); + + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory), + FileFormat.PARQUET, + createPartitionKey(table, partitionRecord)); + + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c3", "AAAA")); + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(record); + } + + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + actionsProvider().rewriteDeletes(table).strategy(ConvertEqDeletesStrategy.class.getName()).execute(); + + CloseableIterable tasks = CloseableIterable.filter( + table.newScan().planFiles(), task -> task.deletes().stream() + .anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + Assert.assertFalse("Should not contain any equality deletes", tasks.iterator().hasNext()); + + table.refresh(); + CloseableIterable newTasks = table.newScan().ignoreResiduals().planFiles(); + List deleteFiles = Lists.newArrayList(); + newTasks.forEach(task -> { + deleteFiles.addAll(task.deletes()); + }); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(records1.get(1)); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDeletesWithDeletes() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .build(); + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + List records3 = Lists.newArrayList( + new ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE") + ); + writeRecords(records3); + table.refresh(); + + DataFile fileForPosDelete = table.currentSnapshot().addedFiles().iterator().next(); + + OutputFileFactory fileFactory = new OutputFileFactory(table.spec(), FileFormat.PARQUET, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + List equalityFieldIds = Lists.newArrayList(table.schema().findField("c3").fieldId()); + Schema eqDeleteRowSchema = table.schema().select("c3"); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, null); + + // write equality delete + Record partitionRecord = GenericRecord.create(table.schema().select("c1")).copy(ImmutableMap.of("c1", 1)); + EncryptedOutputFile file = createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + EqualityDeleteWriter eqDeleteWriter = appenderFactory.newEqDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of("c3", "AAAA")); + try (EqualityDeleteWriter closeableWriter = eqDeleteWriter) { + closeableWriter.delete(record); + } + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + + // write positional delete + partitionRecord = partitionRecord.copy(ImmutableMap.of("c1", 3)); + file = createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + PositionDeleteWriter posDeleteWriter = appenderFactory.newPosDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + posDeleteWriter.delete(fileForPosDelete.path(), 0); + posDeleteWriter.close(); + table.newRowDelta().addDeletes(posDeleteWriter.toDeleteFile()).commit(); + + RewriteDeletes.Result result = actionsProvider().rewriteDeletes(table).execute(); + Assert.assertEquals("Should contain one equality delete to delete", 1, result.deletedFiles().size()); + Assert.assertEquals("Should contain one position delete to add", 1, result.addedFiles().size()); + + CloseableIterable tasks = CloseableIterable.filter( + table.newScan().planFiles(), task -> task.deletes().stream() + .anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) + ); + Assert.assertFalse("Should not contain any equality deletes", tasks.iterator().hasNext()); + + table.refresh(); + CloseableIterable newTasks = table.newScan().ignoreResiduals().planFiles(); + List deleteFiles = Lists.newArrayList(); + newTasks.forEach(task -> { + deleteFiles.addAll(task.deletes()); + }); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(records1.get(1)); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + private void writeRecords(List records) { + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + writeDF(df); + } + + private void writeDF(Dataset df) { + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + } + + private PartitionKey createPartitionKey(Table table, Record record) { + if (table.spec().isUnpartitioned()) { + return null; + } + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + private EncryptedOutputFile createEncryptedOutputFile(PartitionKey partition, OutputFileFactory fileFactory) { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(partition); + } + } + +} + diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 1306491796c9..5a8e774e4fe7 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -49,6 +50,8 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; @@ -205,7 +208,8 @@ public void testReadEqualityDeleteRows() throws IOException { TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); for (CombinedScanTask task : tasks) { - try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.EQUALITY_DELETES)) { while (reader.next()) { actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); } @@ -215,4 +219,96 @@ public void testReadEqualityDeleteRows() throws IOException { Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); } + + @Test + public void testReadPositionDeleteRows() throws IOException { + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 3L), // id = 89 + Pair.of(dataFile.path(), 6L) // id = 122 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expectedRowSet = rowSetWithIds(29, 89, 122); + + Types.StructType type = table.schema().asStruct(); + StructLikeSet actualRowSet = StructLikeSet.create(type); + + CloseableIterable tasks = TableScanUtil.planTasks( + table.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + for (CombinedScanTask task : tasks) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.POSITION_DELETES)) { + while (reader.next()) { + actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); + } + } + } + + Assert.assertEquals("should include 3 deleted row", 3, actualRowSet.size()); + Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + } + + @Test + public void testReadDeleteRows() throws IOException { + Schema deleteSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "b"), // id = 43 + dataDelete.copy("data", "f"), // id = 121 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDelete = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteSchema); + + + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 3L), // id = 89 + Pair.of(dataFile.path(), 6L) // id = 122 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .addDeletes(eqDelete) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expectedRowSet = rowSetWithIds(29, 43, 89, 121, 122); + + Types.StructType type = table.schema().asStruct(); + StructLikeSet actualRowSet = StructLikeSet.create(type); + + CloseableIterable tasks = TableScanUtil.planTasks( + table.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + for (CombinedScanTask task : tasks) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, null)) { + while (reader.next()) { + actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); + } + } + } + + Assert.assertEquals("should include 5 deleted row", 5, actualRowSet.size()); + Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + } } diff --git a/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction24.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction24.java new file mode 100644 index 000000000000..820ec4d23d7d --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction24.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import org.apache.iceberg.spark.actions.SparkActions; + +public class TestRewriteDeletesAction24 extends TestRewriteDeletesAction { + @Override + public ActionsProvider actionsProvider() { + return SparkActions.get(); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction3.java b/spark3/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction3.java new file mode 100644 index 000000000000..591b0406fd6a --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/actions/TestRewriteDeletesAction3.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import org.apache.iceberg.spark.actions.SparkActions; + +public class TestRewriteDeletesAction3 extends TestRewriteDeletesAction { + @Override + public ActionsProvider actionsProvider() { + return SparkActions.get(); + } +}