diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index 264da911745a..999175e063ad 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -44,6 +44,7 @@ */ public class AllManifestsTable extends BaseMetadataTable { private static final Schema MANIFEST_FILE_SCHEMA = new Schema( + Types.NestedField.required(14, "content", Types.IntegerType.get()), Types.NestedField.required(1, "path", Types.StringType.get()), Types.NestedField.required(2, "length", Types.LongType.get()), Types.NestedField.optional(3, "partition_spec_id", Types.IntegerType.get()), @@ -51,6 +52,9 @@ public class AllManifestsTable extends BaseMetadataTable { Types.NestedField.optional(5, "added_data_files_count", Types.IntegerType.get()), Types.NestedField.optional(6, "existing_data_files_count", Types.IntegerType.get()), Types.NestedField.optional(7, "deleted_data_files_count", Types.IntegerType.get()), + Types.NestedField.required(15, "added_delete_files_count", Types.IntegerType.get()), + Types.NestedField.required(16, "existing_delete_files_count", Types.IntegerType.get()), + Types.NestedField.required(17, "deleted_delete_files_count", Types.IntegerType.get()), Types.NestedField.optional(8, "partition_summaries", Types.ListType.ofRequired(9, Types.StructType.of( Types.NestedField.required(10, "contains_null", Types.BooleanType.get()), Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()), diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index 9fad84f06170..85063c3cd173 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -31,6 +31,7 @@ */ public class ManifestsTable extends BaseMetadataTable { private static final Schema SNAPSHOT_SCHEMA = new Schema( + Types.NestedField.required(14, "content", Types.IntegerType.get()), Types.NestedField.required(1, "path", Types.StringType.get()), Types.NestedField.required(2, "length", Types.LongType.get()), Types.NestedField.required(3, "partition_spec_id", Types.IntegerType.get()), @@ -38,6 +39,9 @@ public class ManifestsTable extends BaseMetadataTable { Types.NestedField.required(5, "added_data_files_count", Types.IntegerType.get()), Types.NestedField.required(6, "existing_data_files_count", Types.IntegerType.get()), Types.NestedField.required(7, "deleted_data_files_count", Types.IntegerType.get()), + Types.NestedField.required(15, "added_delete_files_count", Types.IntegerType.get()), + Types.NestedField.required(16, "existing_delete_files_count", Types.IntegerType.get()), + Types.NestedField.required(17, "deleted_delete_files_count", Types.IntegerType.get()), Types.NestedField.required(8, "partition_summaries", Types.ListType.ofRequired(9, Types.StructType.of( Types.NestedField.required(10, "contains_null", Types.BooleanType.get()), Types.NestedField.optional(11, "contains_nan", Types.BooleanType.get()), @@ -92,13 +96,17 @@ private class ManifestsTableScan extends StaticTableScan { static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest) { return StaticDataTask.Row.of( + manifest.content().id(), manifest.path(), manifest.length(), manifest.partitionSpecId(), manifest.snapshotId(), - manifest.addedFilesCount(), - manifest.existingFilesCount(), - manifest.deletedFilesCount(), + manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0, + manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0, + manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0, + manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0, + manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0, + manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0, partitionSummariesToRows(spec, manifest.partitions()) ); } diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 30bfb89a1048..f147016ebbd6 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -62,6 +62,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.iceberg.ManifestContent.DATA; +import static org.apache.iceberg.ManifestContent.DELETES; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -895,13 +897,18 @@ public void testManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", true) @@ -1010,13 +1017,18 @@ public void testAllManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", false) diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 89764018ffc5..ea6aaae5319b 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -63,6 +63,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.iceberg.ManifestContent.DATA; +import static org.apache.iceberg.ManifestContent.DELETES; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -896,13 +898,18 @@ public void testManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", true) @@ -1011,13 +1018,18 @@ public void testAllManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", false) diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 89764018ffc5..ea6aaae5319b 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -63,6 +63,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.iceberg.ManifestContent.DATA; +import static org.apache.iceberg.ManifestContent.DELETES; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -896,13 +898,18 @@ public void testManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", true) @@ -1011,13 +1018,18 @@ public void testAllManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", false) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c47c4c0a2db2..eb3ca569b6a3 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.source; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -26,20 +28,26 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -57,12 +65,15 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.iceberg.ManifestContent.DATA; +import static org.apache.iceberg.ManifestContent.DELETES; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -885,6 +896,23 @@ public void testManifestsTable() { .mode("append") .save(loadLocation(tableIdentifier)); + table.updateProperties() + .set(TableProperties.FORMAT_VERSION, "2") + .commit(); + + DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(), null); + PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); + StructLike dataFilePartition = dataFile.partition(); + + PositionDelete delete = PositionDelete.create(); + delete.set(dataFile.path(), 0L, null); + + DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); + + table.newRowDelta() + .addDeletes(deleteFile) + .commit(); + List actual = spark.read() .format("iceberg") .load(loadLocation(tableIdentifier, "manifests")) @@ -897,16 +925,21 @@ public void testManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder - .set("contains_null", true) + .set("contains_null", manifest.content() == DATA) .set("contains_nan", false) .set("lower_bound", "1") .set("upper_bound", "1") @@ -915,8 +948,9 @@ public void testManifestsTable() { .build() ); - Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size()); + Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size()); TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(1), actual.get(1)); } @Test @@ -993,6 +1027,23 @@ public void testAllManifestsTable() { .mode("append") .save(loadLocation(tableIdentifier)); + table.updateProperties() + .set(TableProperties.FORMAT_VERSION, "2") + .commit(); + + DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(), null); + PartitionSpec dataFileSpec = table.specs().get(dataFile.specId()); + StructLike dataFilePartition = dataFile.partition(); + + PositionDelete delete = PositionDelete.create(); + delete.set(dataFile.path(), 0L, null); + + DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete)); + + table.newRowDelta() + .addDeletes(deleteFile) + .commit(); + manifests.addAll(table.currentSnapshot().allManifests()); table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); @@ -1002,6 +1053,7 @@ public void testAllManifestsTable() { List actual = spark.read() .format("iceberg") .load(loadLocation(tableIdentifier, "all_manifests")) + .dropDuplicates("path") .orderBy("path") .collectAsList(); @@ -1012,13 +1064,18 @@ public void testAllManifestsTable() { GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); List expected = Lists.newArrayList(Iterables.transform(manifests, manifest -> - builder.set("path", manifest.path()) + builder + .set("content", manifest.content().id()) + .set("path", manifest.path()) .set("length", manifest.length()) .set("partition_spec_id", manifest.partitionSpecId()) .set("added_snapshot_id", manifest.snapshotId()) - .set("added_data_files_count", manifest.addedFilesCount()) - .set("existing_data_files_count", manifest.existingFilesCount()) - .set("deleted_data_files_count", manifest.deletedFilesCount()) + .set("added_data_files_count", manifest.content() == DATA ? manifest.addedFilesCount() : 0) + .set("existing_data_files_count", manifest.content() == DATA ? manifest.existingFilesCount() : 0) + .set("deleted_data_files_count", manifest.content() == DATA ? manifest.deletedFilesCount() : 0) + .set("added_delete_files_count", manifest.content() == DELETES ? manifest.addedFilesCount() : 0) + .set("existing_delete_files_count", manifest.content() == DELETES ? manifest.existingFilesCount() : 0) + .set("deleted_delete_files_count", manifest.content() == DELETES ? manifest.deletedFilesCount() : 0) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> summaryBuilder .set("contains_null", false) @@ -1032,7 +1089,7 @@ public void testAllManifestsTable() { expected.sort(Comparator.comparing(o -> o.get("path").toString())); - Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size()); + Assert.assertEquals("Manifests table should have 4 manifest rows", 4, actual.size()); for (int i = 0; i < expected.size(); i += 1) { TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i)); } @@ -1457,4 +1514,28 @@ private void asMetadataRecord(GenericData.Record file) { file.put(0, FileContent.DATA.id()); file.put(3, 0); // specId } + + private PositionDeleteWriter newPositionDeleteWriter(Table table, PartitionSpec spec, + StructLike partition) { + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 0, 0).build(); + EncryptedOutputFile outputFile = fileFactory.newOutputFile(spec, partition); + + SparkFileWriterFactory fileWriterFactory = SparkFileWriterFactory.builderFor(table).build(); + return fileWriterFactory.newPositionDeleteWriter(outputFile, spec, partition); + } + + private DeleteFile writePositionDeletes(Table table, PartitionSpec spec, StructLike partition, + Iterable> deletes) { + PositionDeleteWriter positionDeleteWriter = newPositionDeleteWriter(table, spec, partition); + + try (PositionDeleteWriter writer = positionDeleteWriter) { + for (PositionDelete delete : deletes) { + writer.write(delete); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return positionDeleteWriter.toDeleteFile(); + } }