diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 5c3581aef3ec..4e650e9574e3 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -34,7 +34,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -90,10 +89,13 @@ public ParquetValueReader message( @SuppressWarnings("checkstyle:CyclomaticComplexity") public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + + if (null == expected) { + return new RowDataReader(ImmutableList.of()); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); @@ -102,51 +104,39 @@ public ParquetValueReader struct( if (fieldType.getId() != null) { int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - } else if (reader != null) { - reorderedFields.add(reader); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - RowDataUtil.convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } return new RowDataReader(reorderedFields); } + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + RowDataUtil.convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); + } + + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); + } + @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index 81db62e8bf25..c50060e16a6c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -29,8 +29,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.SerializableTable; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.FileRewritePlan; import org.apache.iceberg.actions.RewriteDataFiles; @@ -93,10 +91,6 @@ public DataFileRewritePlanner( @Override public void open(OpenContext context) throws Exception { tableLoader.open(); - Table table = tableLoader.loadTable(); - Preconditions.checkArgument( - !TableUtil.supportsRowLineage(table), - "Flink does not support compaction on row lineage enabled tables (V3+)"); this.errorCounter = TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex) .counter(TableMaintenanceMetrics.ERROR_COUNTER); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java index ad3b0454008c..57b0e53d86e6 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java @@ -28,11 +28,15 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.RewriteFileGroup; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup; @@ -108,8 +112,10 @@ public void processElement(PlannedGroup value, Context ctx, Collector writer = writerFor(value)) { - try (DataIterator iterator = readerFor(value)) { + boolean preserveRowId = TableUtil.supportsRowLineage(value.table()); + + try (TaskWriter writer = writerFor(value, preserveRowId)) { + try (DataIterator iterator = readerFor(value, preserveRowId)) { while (iterator.hasNext()) { writer.write(iterator.next()); } @@ -169,30 +175,42 @@ public void processElement(PlannedGroup value, Context ctx, Collector writerFor(PlannedGroup value) { + private TaskWriter writerFor(PlannedGroup value, boolean preserveRowId) { String formatString = PropertyUtil.propertyAsString( value.table().properties(), TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + Schema writeSchema = + preserveRowId + ? MetadataColumns.schemaWithRowLineage(value.table().schema()) + : value.table().schema(); + RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema); RowDataTaskWriterFactory factory = new RowDataTaskWriterFactory( - value.table(), - FlinkSchemaUtil.convert(value.table().schema()), + value::table, + flinkWriteType, value.group().inputSplitSize(), FileFormat.fromString(formatString), value.table().properties(), null, - false); + false, + writeSchema, + value.table().spec()); factory.initialize(subTaskId, attemptId); return factory.create(); } - private DataIterator readerFor(PlannedGroup value) { + private DataIterator readerFor(PlannedGroup value, boolean preserveRowId) { + Schema projectedSchema = + preserveRowId + ? MetadataColumns.schemaWithRowLineage(value.table().schema()) + : value.table().schema(); + RowDataFileScanTaskReader reader = new RowDataFileScanTaskReader( value.table().schema(), - value.table().schema(), + projectedSchema, PropertyUtil.propertyAsString(value.table().properties(), DEFAULT_NAME_MAPPING, null), false, Collections.emptyList()); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index bf6f72cc287a..b8fb1ba32edf 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -46,7 +46,6 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; @@ -84,13 +83,7 @@ public RowDataFileScanTaskReader( @Override public CloseableIterator open( FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { - Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); - - Map idToConstant = - partitionSchema.columns().isEmpty() - ? ImmutableMap.of() - : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); - + Map idToConstant = PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); CloseableIterable iterable = diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 0071abfd9a3f..4a6622e48e70 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -90,6 +90,13 @@ private SimpleDataUtil() {} Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "extra", Types.StringType.get())); + public static final Schema SCHEMA3 = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "_row_id", Types.LongType.get()), + Types.NestedField.optional(4, "_last_updated_sequence_number", Types.LongType.get())); + public static final ResolvedSchema FLINK_SCHEMA = ResolvedSchema.of( Column.physical("id", DataTypes.INT()), Column.physical("data", DataTypes.STRING())); @@ -101,6 +108,7 @@ private SimpleDataUtil() {} public static final Record RECORD = GenericRecord.create(SCHEMA); public static final Record RECORD2 = GenericRecord.create(SCHEMA2); + public static final Record RECORD3 = GenericRecord.create(SCHEMA3); public static Table createTable( String path, Map properties, boolean partitioned) { @@ -128,6 +136,16 @@ public static Record createRecord(Integer id, String data, String extra) { return record; } + public static Record createRecordWithRowId( + Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) { + Record record = RECORD3.copy(); + record.setField("id", id); + record.setField("data", data); + record.setField("_row_id", rowId); + record.setField("_last_updated_sequence_number", lastUpdatedSequenceNumber); + return record; + } + public static RowData createRowData(Integer id, String data) { return GenericRowData.of(id, StringData.fromString(data)); } @@ -319,6 +337,11 @@ public static void assertTableRecords(Table table, List expected) throws public static void assertTableRecords(Table table, List expected, String branch) throws IOException { + assertTableRecords(table, expected, branch, table.schema()); + } + + public static void assertTableRecords( + Table table, List expected, String branch, Schema projectSchema) throws IOException { table.refresh(); Snapshot snapshot = latestSnapshot(table, branch); @@ -327,12 +350,15 @@ public static void assertTableRecords(Table table, List expected, String return; } - Types.StructType type = table.schema().asStruct(); + Types.StructType type = projectSchema.asStruct(); StructLikeSet expectedSet = StructLikeSet.create(type); expectedSet.addAll(expected); try (CloseableIterable iterable = - IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) { + IcebergGenerics.read(table) + .useSnapshot(snapshot.snapshotId()) + .project(projectSchema) + .build()) { StructLikeSet actualSet = StructLikeSet.create(type); for (Record record : iterable) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index d8d3c5dc249b..5d2a70ea0e40 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -57,6 +57,7 @@ import org.apache.flink.types.Row; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.data.GenericDataUtil; @@ -176,6 +177,16 @@ public static void assertRowData( LogicalType rowType, StructLike expectedRecord, RowData actualRowData) { + assertRowData(structType, rowType, expectedRecord, actualRowData, null, -1); + } + + public static void assertRowData( + Types.StructType structType, + LogicalType rowType, + StructLike expectedRecord, + RowData actualRowData, + Map idToConstant, + int rowPosition) { if (expectedRecord == null && actualRowData == null) { return; } @@ -197,21 +208,18 @@ public static void assertRowData( LogicalType logicalType = ((RowType) rowType).getTypeAt(pos); Object actualValue = FlinkRowData.createFieldGetter(logicalType, pos).getFieldOrNull(actualRowData); + Object expectedValue; if (expectedField != null) { - assertEquals( - field.type(), logicalType, expected.getField(expectedField.name()), actualValue); + expectedValue = getExpectedValue(idToConstant, rowPosition, expectedField, expected); } else { // convert the initial value to generic because that is the data model used to generate // the expected records - assertEquals( - field.type(), - logicalType, - GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()), - actualValue); + expectedValue = GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()); } - pos += 1; - } + assertEquals(field.type(), logicalType, expectedValue, actualValue); + pos++; + } } else { for (int i = 0; i < types.size(); i += 1) { LogicalType logicalType = ((RowType) rowType).getTypeAt(i); @@ -223,6 +231,30 @@ public static void assertRowData( } } + private static Object getExpectedValue( + Map idToConstant, + int pos, + Types.NestedField expectedField, + Record expected) { + Object expectedValue; + int id = expectedField.fieldId(); + if (id == MetadataColumns.ROW_ID.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = (Long) idToConstant.get(id) + pos; + } + } else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) { + expectedValue = expected.getField(expectedField.name()); + if (expectedValue == null && idToConstant != null) { + expectedValue = idToConstant.get(id); + } + } else { + expectedValue = expected.getField(expectedField.name()); + } + + return expectedValue; + } + private static void assertEquals( Type type, LogicalType logicalType, Object expected, Object actual) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index e6781356f711..4e8c9f03f84c 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -42,8 +42,10 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -74,6 +76,11 @@ protected boolean supportsTimestampNanos() { return true; } + @Override + protected boolean supportsRowLineage() { + return true; + } + @Test public void testBuildReader() { MessageType fileSchema = @@ -216,11 +223,10 @@ public void testTwoLevelList() throws IOException { private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).isTrue(); + OutputFile output = new InMemoryOutputFile(); try (FileAppender writer = - Parquet.write(Files.localOutput(testFile)) + Parquet.write(output) .schema(writeSchema) .createWriterFunc(GenericParquetWriter::create) .build()) { @@ -228,17 +234,20 @@ private void writeAndValidate( } try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(output.toInputFile()) .project(expectedSchema) - .createReaderFunc(type -> FlinkParquetReaders.buildReader(expectedSchema, type)) + .createReaderFunc( + type -> FlinkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT)) .build()) { - Iterator expected = iterable.iterator(); Iterator rows = reader.iterator(); LogicalType rowType = FlinkSchemaUtil.convert(writeSchema); - for (int i = 0; i < NUM_RECORDS; i += 1) { + int pos = 0; + for (Record record : iterable) { assertThat(rows).hasNext(); - TestHelpers.assertRowData(writeSchema.asStruct(), rowType, expected.next(), rows.next()); + TestHelpers.assertRowData( + writeSchema.asStruct(), rowType, record, rows.next(), ID_TO_CONSTANT, pos++); } + assertThat(rows).isExhausted(); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 795057e23538..cc8764e3b60e 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -33,6 +33,9 @@ import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.SimpleDataUtil; @@ -79,6 +82,118 @@ void testRewriteUnpartitioned() throws Exception { createRecord(4, "d"))); } + @Test + void testRewriteUnpartitionedPreserveLineage() throws Exception { + Table table = createTable("3"); + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + insert(table, 4, "d"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L), + SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + + @Test + void testRewriteTheSameFilePreserveLineage() throws Exception { + Table table = createTable("3"); + insert(table, 1, "a"); + insert(table, 2, "b"); + // Create a file with two lines of data to verify that the rowid is read correctly. + insert( + table, + ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d"))); + + assertFileNum(table, 3, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 1, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L), + // The Ids 3 and 4 come from the same file, so the last updated sequence number should + // be the same. + SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + + @Test + void testRewritePartitionedPreserveLineage() throws Exception { + Table table = createPartitionedTable("3"); + insertPartitioned(table, 1, "p1"); + insertPartitioned(table, 2, "p1"); + insertPartitioned(table, 3, "p2"); + insertPartitioned(table, 4, "p2"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles(); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + assertFileNum(table, 2, 0); + + Schema schema = MetadataColumns.schemaWithRowLineage(table.schema()); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L), + SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L), + SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L), + SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)), + SnapshotRef.MAIN_BRANCH, + schema); + } + @Test void testRewritePartitioned() throws Exception { Table table = createPartitionedTable(); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index f9cbc9715cce..9a60bef83728 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.util.List; import javax.annotation.Nullable; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; @@ -49,6 +50,7 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.flink.HadoopCatalogExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -122,7 +124,6 @@ void after() throws IOException { } protected static Table createTable() { - // only test V2 tables as compaction doesn't support V3 with row lineage return createTable("2"); } @@ -152,7 +153,7 @@ protected static Table createTableWithDelete() { ImmutableMap.of("format-version", "2", "write.upsert.enabled", "true")); } - protected static Table createPartitionedTable() { + protected static Table createPartitionedTable(String formatVersion) { return CATALOG_EXTENSION .catalog() .createTable( @@ -160,7 +161,12 @@ protected static Table createPartitionedTable() { SimpleDataUtil.SCHEMA, PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(), null, - ImmutableMap.of("format-version", "2", "flink.max-continuous-empty-commits", "100000")); + ImmutableMap.of( + "format-version", formatVersion, "flink.max-continuous-empty-commits", "100000")); + } + + protected static Table createPartitionedTable() { + return createPartitionedTable("2"); } protected void insert(Table table, Integer id, String data) throws IOException { @@ -169,6 +175,11 @@ protected void insert(Table table, Integer id, String data) throws IOException { table.refresh(); } + protected void insert(Table table, List records) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir).appendToTable(records); + table.refresh(); + } + protected void insert(Table table, Integer id, String data, String extra) throws IOException { new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data, extra))); @@ -237,6 +248,13 @@ protected void insertFullPartitioned(Table table, Integer id, String data) throw table.refresh(); } + protected void insertPartitioned(Table table, List records, String partition) + throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(TestHelpers.Row.of(partition), records); + table.refresh(); + } + protected void dropTable() { CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java index 2d83f553e576..9f4f96e1065b 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -22,7 +22,6 @@ import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles; import static org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Set; @@ -41,18 +40,6 @@ import org.junit.jupiter.api.Test; class TestDataFileRewritePlanner extends OperatorTestBase { - @Test - void testFailsOnV3Table() throws Exception { - Table table = createTable("3"); - Set expected = Sets.newHashSetWithExpectedSize(3); - insert(table, 1, "a"); - expected.addAll(newDataFiles(table)); - - assertThatThrownBy(() -> planDataFileRewrite(tableLoader())) - .hasMessageContaining( - "Flink does not support compaction on row lineage enabled tables (V3+)") - .isInstanceOf(IllegalArgumentException.class); - } @Test void testUnpartitioned() throws Exception {