diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index 5403dfe19aae..6751caeb2892 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -29,8 +29,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.SerializableTable; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.FileRewritePlan; import org.apache.iceberg.actions.RewriteDataFiles; @@ -93,10 +91,6 @@ public DataFileRewritePlanner( @Override public void open(Configuration parameters) throws Exception { tableLoader.open(); - Table table = tableLoader.loadTable(); - Preconditions.checkArgument( - !TableUtil.supportsRowLineage(table), - "Flink does not support compaction on row lineage enabled tables (V3+)"); this.errorCounter = TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex) .counter(TableMaintenanceMetrics.ERROR_COUNTER); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java index c03b5cc1c8fd..1e8db128e9e1 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java @@ -28,11 +28,15 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.RewriteFileGroup; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup; @@ -108,8 +112,10 @@ public void processElement(PlannedGroup value, Context ctx, Collector writer = writerFor(value)) { - try (DataIterator iterator = readerFor(value)) { + boolean preserveRowId = TableUtil.supportsRowLineage(value.table()); + + try (TaskWriter writer = writerFor(value, preserveRowId)) { + try (DataIterator iterator = readerFor(value, preserveRowId)) { while (iterator.hasNext()) { writer.write(iterator.next()); } @@ -169,30 +175,42 @@ public void processElement(PlannedGroup value, Context ctx, Collector writerFor(PlannedGroup value) { + private TaskWriter writerFor(PlannedGroup value, boolean preserveRowId) { String formatString = PropertyUtil.propertyAsString( value.table().properties(), TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + Schema writeSchema = + preserveRowId + ? MetadataColumns.schemaWithRowLineage(value.table().schema()) + : value.table().schema(); + RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema); RowDataTaskWriterFactory factory = new RowDataTaskWriterFactory( - value.table(), - FlinkSchemaUtil.convert(value.table().schema()), + value::table, + flinkWriteType, value.group().inputSplitSize(), FileFormat.fromString(formatString), value.table().properties(), null, - false); + false, + writeSchema, + value.table().spec()); factory.initialize(subTaskId, attemptId); return factory.create(); } - private DataIterator readerFor(PlannedGroup value) { + private DataIterator readerFor(PlannedGroup value, boolean preserveRowId) { + Schema projectedSchema = + preserveRowId + ? MetadataColumns.schemaWithRowLineage(value.table().schema()) + : value.table().schema(); + RowDataFileScanTaskReader reader = new RowDataFileScanTaskReader( value.table().schema(), - value.table().schema(), + projectedSchema, PropertyUtil.propertyAsString(value.table().properties(), DEFAULT_NAME_MAPPING, null), false, Collections.emptyList()); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index bf6f72cc287a..b8fb1ba32edf 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -46,7 +46,6 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; @@ -84,13 +83,7 @@ public RowDataFileScanTaskReader( @Override public CloseableIterator open( FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { - Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); - - Map idToConstant = - partitionSchema.columns().isEmpty() - ? ImmutableMap.of() - : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); - + Map idToConstant = PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); CloseableIterable iterable = diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index d3146d3f4202..fc5bea17b4a2 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -89,6 +89,13 @@ private SimpleDataUtil() {} Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "extra", Types.StringType.get())); + public static final Schema SCHEMA3 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "_row_id", Types.LongType.get()), + Types.NestedField.optional(4, "_last_updated_sequence_number", Types.LongType.get())); + public static final ResolvedSchema FLINK_SCHEMA = ResolvedSchema.of( Column.physical("id", DataTypes.INT()), Column.physical("data", DataTypes.STRING())); @@ -100,6 +107,7 @@ private SimpleDataUtil() {} public static final Record RECORD = GenericRecord.create(SCHEMA); public static final Record RECORD2 = GenericRecord.create(SCHEMA2); + public static final Record RECORD3 = GenericRecord.create(SCHEMA3); public static Table createTable( String path, Map properties, boolean partitioned) { @@ -127,6 +135,16 @@ public static Record createRecord(Integer id, String data, String extra) { return record; } + public static Record createRecordWithRowId( + Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) { + Record record = RECORD3.copy(); + record.setField("id", id); + record.setField("data", data); + record.setField("_row_id", rowId); + record.setField("_last_updated_sequence_number", lastUpdatedSequenceNumber); + return record; + } + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } @@ -348,6 +366,11 @@ public static void assertTableRecords(Table table, List expected) throws public static void assertTableRecords(Table table, List expected, String branch) throws IOException { + assertTableRecords(table, expected, branch, table.schema()); + } + + public static void assertTableRecords( + Table table, List expected, String branch, Schema projectSchema) throws IOException { table.refresh(); Snapshot snapshot = latestSnapshot(table, branch); @@ -360,12 +383,15 @@ public static void assertTableRecords(Table table, List expected, String return; } - Types.StructType type = table.schema().asStruct(); + Types.StructType type = projectSchema.asStruct(); StructLikeSet expectedSet = StructLikeSet.create(type); expectedSet.addAll(expected); try (CloseableIterable iterable = - IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) { + IcebergGenerics.read(table) + .useSnapshot(snapshot.snapshotId()) + .project(projectSchema) + .build()) { StructLikeSet actualSet = StructLikeSet.create(type); for (Record record : iterable) { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 795057e23538..707038c925d5 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -33,6 +33,9 @@ import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.SimpleDataUtil; @@ -79,6 +82,118 @@ void testRewriteUnpartitioned() throws Exception { createRecord(4, "d"))); } + @Test + void testRewriteUnpartitionedPreserveLineage() throws Exception { + Table table = createTable(3); + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + insert(table, 4, "d"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L), + SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + + @Test + void testRewriteTheSameFilePreserveLineage() throws Exception { + Table table = createTable(3); + insert(table, 1, "a"); + insert(table, 2, "b"); + // Create a file with two lines of data to verify that the rowid is read correctly. + insert( + table, + ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d"))); + + assertFileNum(table, 3, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L), + // The Ids 3 and 4 come from the same file, so the last updated sequence number should + // be the same. + SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + + @Test + void testRewritePartitionedPreserveLineage() throws Exception { + Table table = createPartitionedTable(3); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles(); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 2, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L), + SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + @Test void testRewritePartitioned() throws Exception { Table table = createPartitionedTable(); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 8460b392e278..5eecc5a803d3 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.util.List; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.execution.JobClient; @@ -49,6 +50,7 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -122,11 +124,10 @@ void after() throws IOException { } protected static Table createTable() { - // only test V2 tables as compaction doesn't support V3 with row lineage - return createTable("2"); + return createTable(2); } - protected static Table createTable(String formatVersion) { + protected static Table createTable(int formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -136,12 +137,16 @@ protected static Table createTable(String formatVersion) { null, ImmutableMap.of( TableProperties.FORMAT_VERSION, - formatVersion, + String.valueOf(formatVersion), "flink.max-continuous-empty-commits", "100000")); } protected static Table createTableWithDelete() { + return createTableWithDelete(2); + } + + protected static Table createTableWithDelete(int formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -149,10 +154,11 @@ protected static Table createTableWithDelete() { SCHEMA_WITH_PRIMARY_KEY, PartitionSpec.unpartitioned(), null, - ImmutableMap.of("format-version", "2", "write.upsert.enabled", "true")); + ImmutableMap.of( + "format-version", String.valueOf(formatVersion), "write.upsert.enabled", "true")); } - protected static Table createPartitionedTable() { + protected static Table createPartitionedTable(int formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -160,7 +166,15 @@ protected static Table createPartitionedTable() { SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(), null, - ImmutableMap.of("format-version", "2", "flink.max-continuous-empty-commits", "100000")); + ImmutableMap.of( + "format-version", + String.valueOf(formatVersion), + "flink.max-continuous-empty-commits", + "100000")); + } + + protected static Table createPartitionedTable() { + return createPartitionedTable(2); } protected void insert(Table table, Integer id, String data) throws IOException { @@ -169,6 +183,11 @@ protected void insert(Table table, Integer id, String data) throws IOException { table.refresh(); } + protected void insert(Table table, List records) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir).appendToTable(records); + table.refresh(); + } + protected void insert(Table table, Integer id, String data, String extra) throws IOException { new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data, extra))); @@ -195,6 +214,35 @@ protected void update(Table table, Integer id, String oldData, String newData) table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit(); } + /** + * For the same identifier column id this methods simulate the following row operations: + *
  • add an equality delete on oldData + *
  • insert tempData + *
  • add a position delete on tempData + *
  • insert newData + * + * @param table to modify + * @param id the identifier column id + * @param oldData the old data to be deleted + * @param tempData the temp data to be inserted and deleted with a position delete + * @param newData the new data to be inserted + * @param formatVersion the format version to use + */ + protected void update( + Table table, Integer id, String oldData, String tempData, String newData, int formatVersion) + throws IOException { + DataFile dataFile = + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .writeFile( + Lists.newArrayList( + SimpleDataUtil.createRecord(id, tempData), + SimpleDataUtil.createRecord(id, newData))); + DeleteFile eqDelete = writeEqualityDelete(table, id, oldData); + DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData, formatVersion); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit(); + } + /** * For the same identifier column id this methods simulate the following row operations: *
  • add an equality delete on oldData @@ -217,7 +265,7 @@ protected void update(Table table, Integer id, String oldData, String tempData, SimpleDataUtil.createRecord(id, tempData), SimpleDataUtil.createRecord(id, newData))); DeleteFile eqDelete = writeEqualityDelete(table, id, oldData); - DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData); + DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData, 2); table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit(); } @@ -237,6 +285,13 @@ protected void insertFullPartitioned(Table table, Integer id, String data) throw table.refresh(); } + protected void insertPartitioned(Table table, List records, String partition) + throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(TestHelpers.Row.of(partition), records); + table.refresh(); + } + protected void dropTable() { CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); } @@ -332,7 +387,8 @@ private DeleteFile writeEqualityDelete(Table table, Integer id, String oldData) } private DeleteFile writePosDelete( - Table table, CharSequence path, Integer pos, Integer id, String oldData) throws IOException { + Table table, CharSequence path, Integer pos, Integer id, String oldData, int formatVersion) + throws IOException { File file = File.createTempFile("junit", null, warehouseDir.toFile()); assertThat(file.delete()).isTrue(); PositionDelete posDelete = PositionDelete.create(); @@ -341,7 +397,7 @@ private DeleteFile writePosDelete( nested.set(1, oldData); posDelete.set(path, pos, nested); return FileHelpers.writePosDeleteFile( - table, Files.localOutput(file), null, Lists.newArrayList(posDelete)); + table, Files.localOutput(file), null, Lists.newArrayList(posDelete), formatVersion); } static void trigger(OneInputStreamOperatorTestHarness harness) throws Exception { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java index 2d83f553e576..9f4f96e1065b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -22,7 +22,6 @@ import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles; import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Set; @@ -41,18 +40,6 @@ import org.junit.jupiter.api.Test; class TestDataFileRewritePlanner extends OperatorTestBase { - @Test - void testFailsOnV3Table() throws Exception { - Table table = createTable("3"); - Set expected = Sets.newHashSetWithExpectedSize(3); - insert(table, 1, "a"); - expected.addAll(newDataFiles(table)); - - assertThatThrownBy(() -> planDataFileRewrite(tableLoader())) - .hasMessageContaining( - "Flink does not support compaction on row lineage enabled tables (V3+)") - .isInstanceOf(IllegalArgumentException.class); - } @Test void testUnpartitioned() throws Exception { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java index 3c5a10328756..4e21c7a956e4 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -248,6 +248,26 @@ void testV2Table() throws Exception { ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); } + @Test + void testV3Table() throws Exception { + Table table = createTableWithDelete(3); + update(table, 1, null, "a", "b", 3); + update(table, 1, "b", "c"); + + List planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + List actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))), + 1, + ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); + } + @Test void testSplitSize() throws Exception { Table table = createTable(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index 81db62e8bf25..c50060e16a6c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -29,8 +29,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.SerializableTable; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.FileRewritePlan; import org.apache.iceberg.actions.RewriteDataFiles; @@ -93,10 +91,6 @@ public DataFileRewritePlanner( @Override public void open(OpenContext context) throws Exception { tableLoader.open(); - Table table = tableLoader.loadTable(); - Preconditions.checkArgument( - !TableUtil.supportsRowLineage(table), - "Flink does not support compaction on row lineage enabled tables (V3+)"); this.errorCounter = TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex) .counter(TableMaintenanceMetrics.ERROR_COUNTER); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java index ad3b0454008c..57b0e53d86e6 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java @@ -28,11 +28,15 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.RewriteFileGroup; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup; @@ -108,8 +112,10 @@ public void processElement(PlannedGroup value, Context ctx, Collector writer = writerFor(value)) { - try (DataIterator iterator = readerFor(value)) { + boolean preserveRowId = TableUtil.supportsRowLineage(value.table()); + + try (TaskWriter writer = writerFor(value, preserveRowId)) { + try (DataIterator iterator = readerFor(value, preserveRowId)) { while (iterator.hasNext()) { writer.write(iterator.next()); } @@ -169,30 +175,42 @@ public void processElement(PlannedGroup value, Context ctx, Collector writerFor(PlannedGroup value) { + private TaskWriter writerFor(PlannedGroup value, boolean preserveRowId) { String formatString = PropertyUtil.propertyAsString( value.table().properties(), TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + Schema writeSchema = + preserveRowId + ? MetadataColumns.schemaWithRowLineage(value.table().schema()) + : value.table().schema(); + RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema); RowDataTaskWriterFactory factory = new RowDataTaskWriterFactory( - value.table(), - FlinkSchemaUtil.convert(value.table().schema()), + value::table, + flinkWriteType, value.group().inputSplitSize(), FileFormat.fromString(formatString), value.table().properties(), null, - false); + false, + writeSchema, + value.table().spec()); factory.initialize(subTaskId, attemptId); return factory.create(); } - private DataIterator readerFor(PlannedGroup value) { + private DataIterator readerFor(PlannedGroup value, boolean preserveRowId) { + Schema projectedSchema = + preserveRowId + ? MetadataColumns.schemaWithRowLineage(value.table().schema()) + : value.table().schema(); + RowDataFileScanTaskReader reader = new RowDataFileScanTaskReader( value.table().schema(), - value.table().schema(), + projectedSchema, PropertyUtil.propertyAsString(value.table().properties(), DEFAULT_NAME_MAPPING, null), false, Collections.emptyList()); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index bf6f72cc287a..b8fb1ba32edf 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -46,7 +46,6 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; @@ -84,13 +83,7 @@ public RowDataFileScanTaskReader( @Override public CloseableIterator open( FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { - Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); - - Map idToConstant = - partitionSchema.columns().isEmpty() - ? ImmutableMap.of() - : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); - + Map idToConstant = PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); CloseableIterable iterable = diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 6db2b79f77d6..542376b06c27 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -89,6 +89,13 @@ private SimpleDataUtil() {} Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "extra", Types.StringType.get())); + public static final Schema SCHEMA3 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "_row_id", Types.LongType.get()), + Types.NestedField.optional(4, "_last_updated_sequence_number", Types.LongType.get())); + public static final ResolvedSchema FLINK_SCHEMA = ResolvedSchema.of( Column.physical("id", DataTypes.INT()), Column.physical("data", DataTypes.STRING())); @@ -100,6 +107,7 @@ private SimpleDataUtil() {} public static final Record RECORD = GenericRecord.create(SCHEMA); public static final Record RECORD2 = GenericRecord.create(SCHEMA2); + public static final Record RECORD3 = GenericRecord.create(SCHEMA3); public static Table createTable( String path, Map properties, boolean partitioned) { @@ -127,6 +135,16 @@ public static Record createRecord(Integer id, String data, String extra) { return record; } + public static Record createRecordWithRowId( + Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) { + Record record = RECORD3.copy(); + record.setField("id", id); + record.setField("data", data); + record.setField("_row_id", rowId); + record.setField("_last_updated_sequence_number", lastUpdatedSequenceNumber); + return record; + } + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } @@ -348,6 +366,11 @@ public static void assertTableRecords(Table table, List expected) throws public static void assertTableRecords(Table table, List expected, String branch) throws IOException { + assertTableRecords(table, expected, branch, table.schema()); + } + + public static void assertTableRecords( + Table table, List expected, String branch, Schema projectSchema) throws IOException { table.refresh(); Snapshot snapshot = latestSnapshot(table, branch); @@ -360,12 +383,15 @@ public static void assertTableRecords(Table table, List expected, String return; } - Types.StructType type = table.schema().asStruct(); + Types.StructType type = projectSchema.asStruct(); StructLikeSet expectedSet = StructLikeSet.create(type); expectedSet.addAll(expected); try (CloseableIterable iterable = - IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) { + IcebergGenerics.read(table) + .useSnapshot(snapshot.snapshotId()) + .project(projectSchema) + .build()) { StructLikeSet actualSet = StructLikeSet.create(type); for (Record record : iterable) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 795057e23538..707038c925d5 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -33,6 +33,9 @@ import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.SimpleDataUtil; @@ -79,6 +82,118 @@ void testRewriteUnpartitioned() throws Exception { createRecord(4, "d"))); } + @Test + void testRewriteUnpartitionedPreserveLineage() throws Exception { + Table table = createTable(3); + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + insert(table, 4, "d"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L), + SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + + @Test + void testRewriteTheSameFilePreserveLineage() throws Exception { + Table table = createTable(3); + insert(table, 1, "a"); + insert(table, 2, "b"); + // Create a file with two lines of data to verify that the rowid is read correctly. + insert( + table, + ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d"))); + + assertFileNum(table, 3, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L), + // The Ids 3 and 4 come from the same file, so the last updated sequence number should + // be the same. + SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + + @Test + void testRewritePartitionedPreserveLineage() throws Exception { + Table table = createPartitionedTable(3); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles(); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 2, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L), + SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + @Test void testRewritePartitioned() throws Exception { Table table = createPartitionedTable(); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index f9cbc9715cce..b9422a63d646 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.util.List; import javax.annotation.Nullable; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; @@ -49,6 +50,7 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -122,11 +124,10 @@ void after() throws IOException { } protected static Table createTable() { - // only test V2 tables as compaction doesn't support V3 with row lineage - return createTable("2"); + return createTable(2); } - protected static Table createTable(String formatVersion) { + protected static Table createTable(int formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -136,12 +137,16 @@ protected static Table createTable(String formatVersion) { null, ImmutableMap.of( TableProperties.FORMAT_VERSION, - formatVersion, + String.valueOf(formatVersion), "flink.max-continuous-empty-commits", "100000")); } protected static Table createTableWithDelete() { + return createTableWithDelete(2); + } + + protected static Table createTableWithDelete(int formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -149,10 +154,11 @@ protected static Table createTableWithDelete() { SCHEMA_WITH_PRIMARY_KEY, PartitionSpec.unpartitioned(), null, - ImmutableMap.of("format-version", "2", "write.upsert.enabled", "true")); + ImmutableMap.of( + "format-version", String.valueOf(formatVersion), "write.upsert.enabled", "true")); } - protected static Table createPartitionedTable() { + protected static Table createPartitionedTable(int formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -160,7 +166,15 @@ protected static Table createPartitionedTable() { SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(), null, - ImmutableMap.of("format-version", "2", "flink.max-continuous-empty-commits", "100000")); + ImmutableMap.of( + "format-version", + String.valueOf(formatVersion), + "flink.max-continuous-empty-commits", + "100000")); + } + + protected static Table createPartitionedTable() { + return createPartitionedTable(2); } protected void insert(Table table, Integer id, String data) throws IOException { @@ -169,6 +183,11 @@ protected void insert(Table table, Integer id, String data) throws IOException { table.refresh(); } + protected void insert(Table table, List records) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir).appendToTable(records); + table.refresh(); + } + protected void insert(Table table, Integer id, String data, String extra) throws IOException { new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data, extra))); @@ -195,6 +214,35 @@ protected void update(Table table, Integer id, String oldData, String newData) table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit(); } + /** + * For the same identifier column id this methods simulate the following row operations: + *
  • add an equality delete on oldData + *
  • insert tempData + *
  • add a position delete on tempData + *
  • insert newData + * + * @param table to modify + * @param id the identifier column id + * @param oldData the old data to be deleted + * @param tempData the temp data to be inserted and deleted with a position delete + * @param newData the new data to be inserted + * @param formatVersion the format version to use + */ + protected void update( + Table table, Integer id, String oldData, String tempData, String newData, int formatVersion) + throws IOException { + DataFile dataFile = + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .writeFile( + Lists.newArrayList( + SimpleDataUtil.createRecord(id, tempData), + SimpleDataUtil.createRecord(id, newData))); + DeleteFile eqDelete = writeEqualityDelete(table, id, oldData); + DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData, formatVersion); + + table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit(); + } + /** * For the same identifier column id this methods simulate the following row operations: *
  • add an equality delete on oldData @@ -217,7 +265,7 @@ protected void update(Table table, Integer id, String oldData, String tempData, SimpleDataUtil.createRecord(id, tempData), SimpleDataUtil.createRecord(id, newData))); DeleteFile eqDelete = writeEqualityDelete(table, id, oldData); - DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData); + DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, tempData, 2); table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit(); } @@ -237,6 +285,13 @@ protected void insertFullPartitioned(Table table, Integer id, String data) throw table.refresh(); } + protected void insertPartitioned(Table table, List records, String partition) + throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(TestHelpers.Row.of(partition), records); + table.refresh(); + } + protected void dropTable() { CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); } @@ -329,7 +384,8 @@ private DeleteFile writeEqualityDelete(Table table, Integer id, String oldData) } private DeleteFile writePosDelete( - Table table, CharSequence path, Integer pos, Integer id, String oldData) throws IOException { + Table table, CharSequence path, Integer pos, Integer id, String oldData, int formatVersion) + throws IOException { File file = File.createTempFile("junit", null, warehouseDir.toFile()); assertThat(file.delete()).isTrue(); PositionDelete posDelete = PositionDelete.create(); @@ -338,7 +394,7 @@ private DeleteFile writePosDelete( nested.set(1, oldData); posDelete.set(path, pos, nested); return FileHelpers.writePosDeleteFile( - table, Files.localOutput(file), null, Lists.newArrayList(posDelete)); + table, Files.localOutput(file), null, Lists.newArrayList(posDelete), formatVersion); } static void trigger(OneInputStreamOperatorTestHarness harness) throws Exception { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java index 2d83f553e576..9f4f96e1065b 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -22,7 +22,6 @@ import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles; import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Set; @@ -41,18 +40,6 @@ import org.junit.jupiter.api.Test; class TestDataFileRewritePlanner extends OperatorTestBase { - @Test - void testFailsOnV3Table() throws Exception { - Table table = createTable("3"); - Set expected = Sets.newHashSetWithExpectedSize(3); - insert(table, 1, "a"); - expected.addAll(newDataFiles(table)); - - assertThatThrownBy(() -> planDataFileRewrite(tableLoader())) - .hasMessageContaining( - "Flink does not support compaction on row lineage enabled tables (V3+)") - .isInstanceOf(IllegalArgumentException.class); - } @Test void testUnpartitioned() throws Exception { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java index 3c5a10328756..4e21c7a956e4 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -248,6 +248,26 @@ void testV2Table() throws Exception { ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); } + @Test + void testV3Table() throws Exception { + Table table = createTableWithDelete(3); + update(table, 1, null, "a", "b", 3); + update(table, 1, "b", "c"); + + List planned = planDataFileRewrite(tableLoader()); + assertThat(planned).hasSize(1); + + List actual = executeRewrite(planned); + assertThat(actual).hasSize(1); + + assertRewriteFileGroup( + actual.get(0), + table, + records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))), + 1, + ImmutableSet.of(new PartitionData(PartitionSpec.unpartitioned().partitionType()))); + } + @Test void testSplitSize() throws Exception { Table table = createTable();