diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 3b4c5e57e31e..6e972313cd78 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -53,6 +54,7 @@ abstract class ManifestFilterManager> { private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class); private static final Joiner COMMA = Joiner.on(","); + private static final AtomicLong MIN_SEQ_CONST = new AtomicLong(Long.MIN_VALUE); protected static class DeleteException extends ValidationException { private final String partition; @@ -78,6 +80,7 @@ public String partition() { private boolean failMissingDeletePaths = false; private int duplicateDeleteCount = 0; private boolean caseSensitive = true; + private boolean dropPartitionDeleteEnabled = false; // cache filtered manifests to avoid extra work when commits fail. private final Map filteredManifests = Maps.newConcurrentMap(); @@ -85,6 +88,8 @@ public String partition() { // tracking where files were deleted to validate retries quickly private final Map> filteredManifestToDeletedFiles = Maps.newConcurrentMap(); + private Map, AtomicLong> minSequenceNumberByPartition = + Maps.newConcurrentMap(); private final Supplier workerPoolSupplier; @@ -110,6 +115,15 @@ protected void failMissingDeletePaths() { this.failMissingDeletePaths = true; } + protected void setDropPartitionDelete(boolean dropPartitionDelete) { + this.dropPartitionDeleteEnabled = dropPartitionDelete; + } + + public void setMinSequenceNumberByPartition( + Map, AtomicLong> minSequenceNumberByPartition) { + this.minSequenceNumberByPartition = minSequenceNumberByPartition; + } + /** * Add a filter to match files to delete. A file will be deleted if all of the rows it contains * match this or any other filter passed to this method. @@ -145,6 +159,24 @@ protected void dropDeleteFilesOlderThan(long sequenceNumber) { this.minSequenceNumber = sequenceNumber; } + protected void recordPartitionMinDataSequenceNumber( + int specId, StructLike partition, long sequenceNumber) { + Preconditions.checkArgument( + sequenceNumber >= 0, "Invalid minimum data sequence number: %s", sequenceNumber); + + Pair par = Pair.of(specId, partition); + minSequenceNumberByPartition.compute( + par, + (key, currentMin) -> { + if (currentMin == null) { + return new AtomicLong(sequenceNumber); + } else { + currentMin.updateAndGet(min -> Math.min(min, sequenceNumber)); + return currentMin; + } + }); + } + void caseSensitive(boolean newCaseSensitive) { this.caseSensitive = newCaseSensitive; } @@ -289,6 +321,29 @@ private void invalidateFilteredCache() { cleanUncommitted(SnapshotProducer.EMPTY_SET); } + private void recordPartitionMinDataSequenceNumber(ManifestFile manifest) { + if (dropPartitionDeleteEnabled && manifest.content() != ManifestContent.DELETES) { + try (ManifestReader reader = newManifestReader(manifest)) { + reader + .entries() + .forEach( + entry -> { + F dataFile = entry.file(); + long dataFileSequence = dataFile.dataSequenceNumber(); + if (entry.status() == ManifestEntry.Status.DELETED + || dataFileSequence == ManifestWriter.UNASSIGNED_SEQ) { + // ignore data files already deleted + return; + } + recordPartitionMinDataSequenceNumber( + dataFile.specId(), dataFile.partition(), dataFileSequence); + }); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); + } + } + } + /** @return a ManifestReader that is a filtered version of the input manifest. */ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { ManifestFile cached = filteredManifests.get(manifest); @@ -296,6 +351,8 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { return cached; } + recordPartitionMinDataSequenceNumber(manifest); + boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles(); if (!hasLiveFiles || !canContainDeletedFiles(manifest)) { filteredManifests.put(manifest, manifest); @@ -360,14 +417,30 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { return canContainExpressionDeletes || canContainDroppedPartitions || canContainDroppedFiles - || canContainDropBySeq; + || canContainDropBySeq + || (dropPartitionDeleteEnabled && canContainDropByPartitionSeq(manifest)); + } + + private boolean canContainDropByPartitionSeq(ManifestFile manifest) { + // When delete file within delete manifest could be marked for delete + // That delete file sequence must be less the one of its corresponding data sequence in + // partition + // Conversely if the min sequence of the delete manifest is bigger than the max of all data + // sequences in partition + // Then there is no chance that this manifest contains delete file that could be removed + return dropPartitionDeleteEnabled + && manifest.content() == ManifestContent.DELETES + && !(manifest.minSequenceNumber() + > this.minSequenceNumberByPartition.values().stream() + .mapToLong(AtomicLong::get) + .max() + .orElse(Long.MIN_VALUE)); } @SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"}) private boolean manifestHasDeletedFiles( PartitionAndMetricsEvaluator evaluator, ManifestReader reader) { boolean isDelete = reader.isDeleteManifestReader(); - for (ManifestEntry entry : reader.liveEntries()) { F file = entry.file(); boolean markedForDelete = @@ -376,7 +449,13 @@ private boolean manifestHasDeletedFiles( || (isDelete && entry.isLive() && entry.dataSequenceNumber() > 0 - && entry.dataSequenceNumber() < minSequenceNumber); + && (entry.dataSequenceNumber() < minSequenceNumber + || (dropPartitionDeleteEnabled + && entry.dataSequenceNumber() + < this.minSequenceNumberByPartition + .getOrDefault( + Pair.of(file.specId(), file.partition()), MIN_SEQ_CONST) + .get()))); if (markedForDelete || evaluator.rowsMightMatch(file)) { boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file); @@ -409,7 +488,6 @@ private ManifestFile filterManifestWithDeletedFiles( // manifest. produce a copy of the manifest with all deleted files removed. List deletedFiles = Lists.newArrayList(); Set deletedPaths = Sets.newHashSet(); - try { ManifestWriter writer = newManifestWriter(reader.spec()); try { @@ -424,7 +502,14 @@ private ManifestFile filterManifestWithDeletedFiles( || (isDelete && entry.isLive() && entry.dataSequenceNumber() > 0 - && entry.dataSequenceNumber() < minSequenceNumber); + && (entry.dataSequenceNumber() < minSequenceNumber + || (dropPartitionDeleteEnabled + && entry.dataSequenceNumber() + < minSequenceNumberByPartition + .getOrDefault( + Pair.of(file.specId(), file.partition()), + MIN_SEQ_CONST) + .get()))); if (entry.status() != ManifestEntry.Status.DELETED) { if (markedForDelete || evaluator.rowsMightMatch(file)) { boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 5d3ec6e35f0d..dcb831d031a2 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.TableProperties.DROP_PARTITION_DELETE_ENABLED; +import static org.apache.iceberg.TableProperties.DROP_PARTITION_DELETE_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT; import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT; import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES; @@ -30,6 +32,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -77,6 +80,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final ManifestFilterManager filterManager; private final ManifestMergeManager deleteMergeManager; private final ManifestFilterManager deleteFilterManager; + private final boolean dropPartitionDeleteEnabled; // update data private final List newDataFiles = Lists.newArrayList(); @@ -119,6 +123,15 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { this.deleteMergeManager = new DeleteFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled); this.deleteFilterManager = new DeleteFileFilterManager(); + this.dropPartitionDeleteEnabled = + ops.current() + .propertyAsBoolean( + DROP_PARTITION_DELETE_ENABLED, DROP_PARTITION_DELETE_ENABLED_DEFAULT); + this.deleteFilterManager.setDropPartitionDelete(dropPartitionDeleteEnabled); + this.filterManager.setDropPartitionDelete(dropPartitionDeleteEnabled); + Map, AtomicLong> seqByPartMap = Maps.newConcurrentMap(); + this.deleteFilterManager.setMinSequenceNumberByPartition(seqByPartMap); + this.filterManager.setMinSequenceNumberByPartition(seqByPartMap); } @Override diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 2267ba03fd7b..349f16d81cc6 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -117,6 +117,10 @@ private TableProperties() {} public static final String MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled"; public static final boolean MANIFEST_MERGE_ENABLED_DEFAULT = true; + public static final String DROP_PARTITION_DELETE_ENABLED = "commit.drop-partition-delete.enabled"; + + // TODO: turn back to false after review + public static final boolean DROP_PARTITION_DELETE_ENABLED_DEFAULT = true; public static final String DEFAULT_FILE_FORMAT = "write.format.default"; public static final String DELETE_DEFAULT_FILE_FORMAT = "write.delete.format.default"; diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDeleteFilesAction.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDeleteFilesAction.java new file mode 100644 index 000000000000..0c847b4672fe --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDeleteFilesAction.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.apache.iceberg.TableProperties.DROP_PARTITION_DELETE_ENABLED; +import static org.apache.iceberg.types.Types.NestedField.optional; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.actions.RewriteDataFiles.Result; +import org.apache.iceberg.actions.SizeBasedFileRewriter; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestRewriteDeleteFilesAction extends SparkTestBase { + + private static final int SCALE = 400000; + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + PartitionSpec partitionSpecC1 = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + private String tableLocation = null; + + @Before + public void setupTableLocation() throws Exception { + File tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + private Result rewriteTable(Table table) { + return actions() + .rewriteDataFiles(table) + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + } + + @Test + public void testRewritePartitionDeletesShouldNotRetain() throws IOException { + // TODO: another case with global equality deletes + BaseTable table = createV2PartitionTable(partitionSpecC1); + table.updateProperties().set(DROP_PARTITION_DELETE_ENABLED, "true").commit(); + // write partition 1 data sequence 1 + List records1 = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); + writeRecords(records1); + + // write partition 2 data sequence 2 + List records2 = + Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")); + writeRecords(records2); + + // write equality deletes sequence 3 + writeEqDeleteRecord(table, "c1", 1, "c3", "AAAA"); + // write equality deletes sequence 4 + writeEqDeleteRecord(table, "c1", 2, "c3", "CCCC"); + + // only rewrite c1=1 partition + // expecting to see partition 1 data rewritten and sequence raise to 5 + // this time the partition 1 delete file sees a min sequence number of 1 + // its corresponding delete file with sequence number of 3, so delete file retained + actions() + .rewriteDataFiles(table) + .filter(Expressions.equal("c1", 1)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + // only rewrite c1=1 partition + // partition c1 = 1 partition has the new data file and min sequence should be 5 + // the old delete file in partition 1 now never applies to partition, hence should be removed + actions() + .rewriteDataFiles(table) + .filter(Expressions.equal("c1", 1)) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + + // because partition c1 = 1 is rewritten, so expecting its delete files removed + table.refresh(); + Map commitSummary = table.currentSnapshot().summary(); + Assert.assertEquals("1", commitSummary.get("total-equality-deletes")); + } + + private void writeEqDeleteRecord( + BaseTable table, String partCol, Object partVal, String delCol, Object delVal) { + List equalityFieldIds = Lists.newArrayList(table.schema().findField(delCol).fieldId()); + Schema eqDeleteRowSchema = table.schema().select(delCol); + Record partitionRecord = + GenericRecord.create(table.schema().select(partCol)) + .copy(ImmutableMap.of(partCol, partVal)); + Record record = GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of(delCol, delVal)); + writeEqDeleteRecord(table, equalityFieldIds, partitionRecord, eqDeleteRowSchema, record); + } + + private void writeEqDeleteRecord( + BaseTable table, + List equalityFieldIds, + Record partitionRecord, + Schema eqDeleteRowSchema, + Record deleteRecord) { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build(); + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory( + table.schema(), + table.spec(), + ArrayUtil.toIntArray(equalityFieldIds), + eqDeleteRowSchema, + null); + + EncryptedOutputFile file = + createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + EqualityDeleteWriter eqDeleteWriter = + appenderFactory.newEqDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + + try (EqualityDeleteWriter clsEqDeleteWriter = eqDeleteWriter) { + // delete c3=AAAA + clsEqDeleteWriter.write(deleteRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit(); + } + + private void writePosDeleteRecord( + BaseTable table, String partCol, Object partVal, CharSequence path, int pos) { + Record partitionRecord = + GenericRecord.create(table.schema().select(partCol)) + .copy(ImmutableMap.of(partCol, partVal)); + writePosDeleteRecord(table, partitionRecord, path, pos); + } + + private void writePosDeleteRecord( + BaseTable table, Record partitionRecord, CharSequence path, int pos) { + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec(), null, null, null); + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build(); + EncryptedOutputFile file = + createEncryptedOutputFile(createPartitionKey(table, partitionRecord), fileFactory); + PositionDeleteWriter posDeleteWriter = + appenderFactory.newPosDeleteWriter( + file, FileFormat.PARQUET, createPartitionKey(table, partitionRecord)); + PositionDelete positionDeleteRecord = PositionDelete.create().set(path, pos, null); + try (PositionDeleteWriter clsPosDeleteWriter = posDeleteWriter) { + clsPosDeleteWriter.write(positionDeleteRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + table.newRowDelta().addDeletes(posDeleteWriter.toDeleteFile()).commit(); + table.refresh(); + } + + private BaseTable createV2PartitionTable(PartitionSpec spec) { + Map options = Maps.newHashMap(); + BaseTable table = (BaseTable) TABLES.create(SCHEMA, spec, options, tableLocation); + TableOperations ops = table.operations(); + ops.commit(ops.current(), ops.current().upgradeToFormatVersion(2)); + return table; + } + + protected List currentData() { + return rowsToJava( + spark.read().format("iceberg").load(tableLocation).sort("c1", "c2", "c3").collectAsList()); + } + + protected long testDataSize(Table table) { + return Streams.stream(table.newScan().planFiles()).mapToLong(FileScanTask::length).sum(); + } + + protected void shouldHaveFiles(Table table, int numExpected) { + table.refresh(); + int numFiles = Iterables.size(table.newScan().planFiles()); + Assert.assertEquals("Did not have the expected number of files", numExpected, numFiles); + } + + protected Table createTable() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + table + .updateProperties() + .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(20 * 1024)) + .commit(); + Assert.assertNull("Table must be empty", table.currentSnapshot()); + table.refresh(); + return table; + } + + /** + * Create a table with a certain number of files, returns the size of a file + * + * @param files number of files to create + * @return the created table + */ + protected Table createTable(int files) { + Table table = createTable(); + writeRecords(files, SCALE); + return table; + } + + protected Table createTablePartitioned( + int partitions, int files, int numRecords, Map options) { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + Assert.assertNull("Table must be empty", table.currentSnapshot()); + table.refresh(); + writeRecords(files, numRecords, partitions); + return table; + } + + protected Table createTablePartitioned(int partitions, int files) { + return createTablePartitioned(partitions, files, SCALE, Maps.newHashMap()); + } + + private void writeRecords(int files, int numRecords) { + writeRecords(files, numRecords, 0); + } + + private void writeRecords(List records) { + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + writeDF(df); + } + + private PartitionKey createPartitionKey(Table table, Record record) { + if (table.spec().isUnpartitioned()) { + return null; + } + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + private EncryptedOutputFile createEncryptedOutputFile( + PartitionKey partition, OutputFileFactory fileFactory) { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(partition); + } + } + + private void writeRecords(int files, int numRecords, int partitions) { + List records = Lists.newArrayList(); + int rowDimension = (int) Math.ceil(Math.sqrt(numRecords)); + List> data = + IntStream.range(0, rowDimension) + .boxed() + .flatMap(x -> IntStream.range(0, rowDimension).boxed().map(y -> Pair.of(x, y))) + .collect(Collectors.toList()); + Collections.shuffle(data, new Random(42)); + if (partitions > 0) { + data.forEach( + i -> + records.add( + new ThreeColumnRecord( + i.first() % partitions, "foo" + i.first(), "bar" + i.second()))); + } else { + data.forEach( + i -> + records.add(new ThreeColumnRecord(i.first(), "foo" + i.first(), "bar" + i.second()))); + } + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).repartition(files); + writeDF(df); + } + + private void writeDF(Dataset df) { + df.select("c1", "c2", "c3") + .sortWithinPartitions("c1", "c2") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + } + + private List writePosDeletesToFile( + Table table, DataFile dataFile, int outputDeleteFiles) { + return writePosDeletes( + table, dataFile.partition(), dataFile.path().toString(), outputDeleteFiles); + } + + private List writePosDeletes( + Table table, StructLike partition, String path, int outputDeleteFiles) { + List results = Lists.newArrayList(); + int rowPosition = 0; + for (int file = 0; file < outputDeleteFiles; file++) { + OutputFile outputFile = + table + .io() + .newOutputFile( + table.locationProvider().newDataLocation(UUID.randomUUID().toString())); + EncryptedOutputFile encryptedOutputFile = + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY); + + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec(), null, null, null); + PositionDeleteWriter posDeleteWriter = + appenderFactory + .set(TableProperties.DEFAULT_WRITE_METRICS_MODE, "full") + .newPosDeleteWriter(encryptedOutputFile, FileFormat.PARQUET, partition); + + PositionDelete posDelete = PositionDelete.create(); + posDeleteWriter.write(posDelete.set(path, rowPosition, null)); + try { + posDeleteWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + results.add(posDeleteWriter.toDeleteFile()); + rowPosition++; + } + + return results; + } + + private SparkActions actions() { + return SparkActions.get(); + } +}