From b047073dea5a36c29afed295d6e8efebee62b7aa Mon Sep 17 00:00:00 2001 From: huzheng Date: Mon, 28 Dec 2020 15:26:16 +0800 Subject: [PATCH 1/2] Flink: Flat INSERT as one DELETE following one INSERT if configure to use UPSERT. --- .../apache/iceberg/flink/sink/FlinkSink.java | 92 ++++++++++++++- .../flink/sink/TestFlinkIcebergSinkV2.java | 105 +++++++++++++++++- .../flink/sink/TestIcebergStreamWriter.java | 4 +- 3 files changed, 194 insertions(+), 7 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 8c4486aa28c6..11d9d347a1c3 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -20,7 +20,9 @@ package org.apache.iceberg.flink.sink; import java.io.IOException; +import java.io.Serializable; import java.io.UncheckedIOException; +import java.lang.reflect.Array; import java.util.List; import java.util.Locale; import java.util.Map; @@ -31,14 +33,18 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -115,6 +121,7 @@ public static class Builder { private TableSchema tableSchema; private boolean overwrite = false; private Integer writeParallelism = null; + private boolean insertAsUpsert = false; private List equalityFieldColumns = null; private Builder() { @@ -172,6 +179,20 @@ public Builder writeParallelism(int newWriteParallelism) { return this; } + /** + * All INSERT events from input stream will be transformed to UPSERT events, which means it will DELETE the old + * records and then INSERT the new records. In partitioned table, the partition fields should be a subset of + * equality fields, otherwise the old row that located in partition-A could not be deleted by the new row that + * located in partition-B. + * + * @param enable indicate whether it should transform all INSERT events to UPSERT. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder insertAsUpsert(boolean enable) { + this.insertAsUpsert = enable; + return this; + } + /** * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events. * @@ -183,6 +204,28 @@ public Builder equalityFieldColumns(List columns) { return this; } + private DataStream transformInsertAsUpsert(RowType flinkSchema, DataStream dataStream) { + RowDataCloner cloner = new RowDataCloner(flinkSchema); + + return dataStream.flatMap((rowData, out) -> { + switch (rowData.getRowKind()) { + case INSERT: + out.collect(cloner.cloneAsDeleteRow(rowData)); + out.collect(rowData); + break; + + case DELETE: + case UPDATE_BEFORE: + case UPDATE_AFTER: + out.collect(rowData); + break; + + default: + throw new UnsupportedOperationException("Unknown row kind: " + rowData.getRowKind()); + } + }, RowDataTypeInfo.of(flinkSchema)); + } + @SuppressWarnings("unchecked") public DataStreamSink build() { Preconditions.checkArgument(rowDataInput != null, @@ -209,7 +252,24 @@ public DataStreamSink build() { } } - IcebergStreamWriter streamWriter = createStreamWriter(table, tableSchema, equalityFieldIds); + // Convert the iceberg schema to flink's RowType. + RowType flinkSchema = convertToRowType(table, tableSchema); + + // Convert the INSERT stream to be an UPSERT stream if needed. + if (insertAsUpsert) { + Preconditions.checkState(!equalityFieldIds.isEmpty(), + "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); + if (!table.spec().isUnpartitioned()) { + for (PartitionField partitionField : table.spec().fields()) { + Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()), + "Partition field '%s' is not included in equality fields: '%s'", partitionField, equalityFieldColumns); + } + } + + rowDataInput = transformInsertAsUpsert(flinkSchema, rowDataInput); + } + + IcebergStreamWriter streamWriter = createStreamWriter(table, flinkSchema, equalityFieldIds); IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite); this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism; @@ -227,8 +287,7 @@ public DataStreamSink build() { } } - static IcebergStreamWriter createStreamWriter(Table table, TableSchema requestedSchema, - List equalityFieldIds) { + private static RowType convertToRowType(Table table, TableSchema requestedSchema) { Preconditions.checkArgument(table != null, "Iceberg table should't be null"); RowType flinkSchema; @@ -246,6 +305,13 @@ static IcebergStreamWriter createStreamWriter(Table table, TableSchema flinkSchema = FlinkSchemaUtil.convert(table.schema()); } + return flinkSchema; + } + + static IcebergStreamWriter createStreamWriter(Table table, RowType flinkSchema, + List equalityFieldIds) { + Preconditions.checkArgument(table != null, "Iceberg table should't be null"); + Map props = table.properties(); long targetFileSize = getTargetFileSizeBytes(props); FileFormat fileFormat = getFileFormat(props); @@ -267,4 +333,24 @@ private static long getTargetFileSizeBytes(Map properties) { WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); } + + private static class RowDataCloner implements Serializable { + private final RowData.FieldGetter[] fieldGetters; + + private RowDataCloner(RowType schema) { + List fieldTypes = schema.getChildren(); + this.fieldGetters = (RowData.FieldGetter[]) Array.newInstance(RowData.FieldGetter.class, fieldTypes.size()); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i); + } + } + + private RowData cloneAsDeleteRow(RowData oldRow) { + GenericRowData newRowData = new GenericRowData(RowKind.DELETE, fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + newRowData.setField(i, fieldGetters[i].getFieldOrNull(oldRow)); + } + return newRowData; + } + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 93222ddc4535..8de4f8b86187 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; @@ -132,6 +133,7 @@ private List findValidSnapshots(Table table) { private void testChangeLogs(List equalityFieldColumns, KeySelector keySelector, + boolean insertAsUpsert, List> elementsPerCheckpoint, List> expectedRecordsPerCheckpoint) throws Exception { DataStream dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); @@ -145,6 +147,7 @@ private void testChangeLogs(List equalityFieldColumns, .tableSchema(SimpleDataUtil.FLINK_SCHEMA) .writeParallelism(parallelism) .equalityFieldColumns(equalityFieldColumns) + .insertAsUpsert(insertAsUpsert) .build(); // Execute the program. @@ -207,7 +210,8 @@ public void testChangeLogOnIdKey() throws Exception { ImmutableList.of(record(1, "ddd"), record(2, "ddd")) ); - testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), elementsPerCheckpoint, expectedRecords); + testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false, + elementsPerCheckpoint, expectedRecords); } @Test @@ -238,7 +242,8 @@ public void testChangeLogOnDataKey() throws Exception { ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")) ); - testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), elementsPerCheckpoint, expectedRecords); + testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), false, + elementsPerCheckpoint, expectedRecords); } @Test @@ -269,7 +274,7 @@ public void testChangeLogOnIdDataKey() throws Exception { ); testChangeLogs(ImmutableList.of("data", "id"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), - elementsPerCheckpoint, expectedRecords); + false, elementsPerCheckpoint, expectedRecords); } @Test @@ -307,9 +312,103 @@ public void testChangeLogOnSameKey() throws Exception { ); testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, elementsPerCheckpoint, expectedRecords); + } + + @Test + public void testUpsertOnIdKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+I", 1, "bbb") + ), + ImmutableList.of( + row("+I", 1, "ccc") + ), + ImmutableList.of( + row("+I", 1, "ddd"), + row("+I", 1, "eee") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(1, "bbb")), + ImmutableList.of(record(1, "ccc")), + ImmutableList.of(record(1, "eee")) + ); + + if (!partitioned) { + testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, + elementsPerCheckpoint, expectedRecords); + } else { + AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys", + IllegalStateException.class, "not included in equality fields", + () -> { + testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint, + expectedRecords); + return null; + }); + } + } + + @Test + public void testUpsertOnDataKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+I", 2, "aaa"), + row("+I", 3, "bbb") + ), + ImmutableList.of( + row("+I", 4, "aaa"), + row("-U", 3, "bbb"), + row("+U", 5, "bbb") + ), + ImmutableList.of( + row("+I", 6, "aaa"), + row("+I", 7, "bbb") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(2, "aaa"), record(3, "bbb")), + ImmutableList.of(record(4, "aaa"), record(5, "bbb")), + ImmutableList.of(record(6, "aaa"), record(7, "bbb")) + ); + + testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), true, elementsPerCheckpoint, expectedRecords); } + @Test + public void testUpsertOnIdDataKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+I", 1, "aaa"), + row("+I", 2, "bbb") + ), + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 2, "bbb"), + row("+I", 2, "ccc") + ), + ImmutableList.of( + row("-U", 1, "aaa"), + row("+U", 1, "bbb") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(1, "aaa"), record(2, "bbb")), + ImmutableList.of(record(1, "aaa"), record(2, "ccc")), + ImmutableList.of(record(1, "bbb"), record(2, "ccc")) + ); + + testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + true, elementsPerCheckpoint, expectedRecords); + } + private StructLikeSet expectedRowSet(Record... records) { return SimpleDataUtil.expectedRowSet(table, records); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index c6c20e0624fb..a034e9068a8e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -32,6 +32,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -337,7 +338,8 @@ private OneInputStreamOperatorTestHarness createIcebergStr private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { - IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkSchema, null); + RowType rowType = (RowType) flinkSchema.toRowDataType().getLogicalType(); + IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, rowType, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); From bab1b2238039e7b977cc566b5a13c3e37d3973e6 Mon Sep 17 00:00:00 2001 From: huzheng Date: Mon, 28 Dec 2020 20:31:23 +0800 Subject: [PATCH 2/2] Minor changes. --- .../flink/sink/BaseDeltaTaskWriter.java | 8 +- .../apache/iceberg/flink/sink/FlinkSink.java | 77 ++++--------------- .../flink/sink/PartitionedDeltaWriter.java | 6 +- .../flink/sink/RowDataTaskWriterFactory.java | 9 ++- .../flink/sink/UnpartitionedDeltaWriter.java | 6 +- .../iceberg/flink/source/RowDataRewriter.java | 3 +- .../flink/sink/TestDeltaTaskWriter.java | 2 +- .../flink/sink/TestFlinkIcebergSinkV2.java | 12 +-- .../flink/sink/TestIcebergStreamWriter.java | 2 +- .../iceberg/flink/sink/TestTaskWriters.java | 2 +- 10 files changed, 47 insertions(+), 80 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 10dab416091c..3696b446a435 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -41,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; private final Schema deleteSchema; private final RowDataWrapper wrapper; + private final boolean upsert; BaseDeltaTaskWriter(PartitionSpec spec, FileFormat format, @@ -50,11 +51,13 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List equalityFieldIds) { + List equalityFieldIds, + boolean upsert) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.schema = schema; this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + this.upsert = upsert; } abstract RowDataDeltaWriter route(RowData row); @@ -70,6 +73,9 @@ public void write(RowData row) throws IOException { switch (row.getRowKind()) { case INSERT: case UPDATE_AFTER: + if (upsert) { + writer.delete(row); + } writer.write(row); break; diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 11d9d347a1c3..138cfa7bdae9 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -20,9 +20,7 @@ package org.apache.iceberg.flink.sink; import java.io.IOException; -import java.io.Serializable; import java.io.UncheckedIOException; -import java.lang.reflect.Array; import java.util.List; import java.util.Locale; import java.util.Map; @@ -33,15 +31,12 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; @@ -121,7 +116,7 @@ public static class Builder { private TableSchema tableSchema; private boolean overwrite = false; private Integer writeParallelism = null; - private boolean insertAsUpsert = false; + private boolean upsert = false; private List equalityFieldColumns = null; private Builder() { @@ -180,16 +175,16 @@ public Builder writeParallelism(int newWriteParallelism) { } /** - * All INSERT events from input stream will be transformed to UPSERT events, which means it will DELETE the old - * records and then INSERT the new records. In partitioned table, the partition fields should be a subset of - * equality fields, otherwise the old row that located in partition-A could not be deleted by the new row that - * located in partition-B. + * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will + * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be + * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the + * new row that located in partition-B. * - * @param enable indicate whether it should transform all INSERT events to UPSERT. + * @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. * @return {@link Builder} to connect the iceberg table. */ - public Builder insertAsUpsert(boolean enable) { - this.insertAsUpsert = enable; + public Builder upsert(boolean enable) { + this.upsert = enable; return this; } @@ -204,28 +199,6 @@ public Builder equalityFieldColumns(List columns) { return this; } - private DataStream transformInsertAsUpsert(RowType flinkSchema, DataStream dataStream) { - RowDataCloner cloner = new RowDataCloner(flinkSchema); - - return dataStream.flatMap((rowData, out) -> { - switch (rowData.getRowKind()) { - case INSERT: - out.collect(cloner.cloneAsDeleteRow(rowData)); - out.collect(rowData); - break; - - case DELETE: - case UPDATE_BEFORE: - case UPDATE_AFTER: - out.collect(rowData); - break; - - default: - throw new UnsupportedOperationException("Unknown row kind: " + rowData.getRowKind()); - } - }, RowDataTypeInfo.of(flinkSchema)); - } - @SuppressWarnings("unchecked") public DataStreamSink build() { Preconditions.checkArgument(rowDataInput != null, @@ -256,7 +229,7 @@ public DataStreamSink build() { RowType flinkSchema = convertToRowType(table, tableSchema); // Convert the INSERT stream to be an UPSERT stream if needed. - if (insertAsUpsert) { + if (upsert) { Preconditions.checkState(!equalityFieldIds.isEmpty(), "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); if (!table.spec().isUnpartitioned()) { @@ -265,11 +238,9 @@ public DataStreamSink build() { "Partition field '%s' is not included in equality fields: '%s'", partitionField, equalityFieldColumns); } } - - rowDataInput = transformInsertAsUpsert(flinkSchema, rowDataInput); } - IcebergStreamWriter streamWriter = createStreamWriter(table, flinkSchema, equalityFieldIds); + IcebergStreamWriter streamWriter = createStreamWriter(table, flinkSchema, equalityFieldIds, upsert); IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite); this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism; @@ -308,8 +279,10 @@ private static RowType convertToRowType(Table table, TableSchema requestedSchema return flinkSchema; } - static IcebergStreamWriter createStreamWriter(Table table, RowType flinkSchema, - List equalityFieldIds) { + static IcebergStreamWriter createStreamWriter(Table table, + RowType flinkSchema, + List equalityFieldIds, + boolean upsert) { Preconditions.checkArgument(table != null, "Iceberg table should't be null"); Map props = table.properties(); @@ -318,7 +291,7 @@ static IcebergStreamWriter createStreamWriter(Table table, RowType flin TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, - equalityFieldIds); + equalityFieldIds, upsert); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } @@ -333,24 +306,4 @@ private static long getTargetFileSizeBytes(Map properties) { WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); } - - private static class RowDataCloner implements Serializable { - private final RowData.FieldGetter[] fieldGetters; - - private RowDataCloner(RowType schema) { - List fieldTypes = schema.getChildren(); - this.fieldGetters = (RowData.FieldGetter[]) Array.newInstance(RowData.FieldGetter.class, fieldTypes.size()); - for (int i = 0; i < fieldTypes.size(); i++) { - fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i); - } - } - - private RowData cloneAsDeleteRow(RowData oldRow) { - GenericRowData newRowData = new GenericRowData(RowKind.DELETE, fieldGetters.length); - for (int i = 0; i < fieldGetters.length; i++) { - newRowData.setField(i, fieldGetters[i].getFieldOrNull(oldRow)); - } - return newRowData; - } - } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index b2f8ceece9f8..1eee6298e933 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -49,8 +49,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds); + List equalityFieldIds, + boolean upsert) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, + upsert); this.partitionKey = new PartitionKey(spec, schema); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index b0776f49d190..be7268da6670 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -49,6 +49,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final long targetFileSizeBytes; private final FileFormat format; private final List equalityFieldIds; + private final boolean upsert; private final FileAppenderFactory appenderFactory; private transient OutputFileFactory outputFileFactory; @@ -62,7 +63,8 @@ public RowDataTaskWriterFactory(Schema schema, long targetFileSizeBytes, FileFormat format, Map tableProperties, - List equalityFieldIds) { + List equalityFieldIds, + boolean upsert) { this.schema = schema; this.flinkSchema = flinkSchema; this.spec = spec; @@ -72,6 +74,7 @@ public RowDataTaskWriterFactory(Schema schema, this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; this.equalityFieldIds = equalityFieldIds; + this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, tableProperties, spec); @@ -104,10 +107,10 @@ public TaskWriter create() { // Initialize a task writer to write both INSERT and equality DELETE. if (spec.isUnpartitioned()) { return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds); + targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); } else { return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds); + targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java index 341e634df713..331ed7c78192 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java @@ -41,8 +41,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds); + List equalityFieldIds, + boolean upsert) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, + upsert); this.writer = new RowDataDeltaWriter(null); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 8b4986dcd67b..7691366e9b9b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -81,7 +81,8 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption Long.MAX_VALUE, format, table.properties(), - null); + null, + false); } public List rewriteDataForTasks(DataStream dataStream, int parallelism) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index 603562bc70a3..1b157c9d6efb 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -333,6 +333,6 @@ private StructLikeSet actualRowSet(String... columns) throws IOException { private TaskWriterFactory createTaskWriterFactory(List equalityFieldIds) { return new RowDataTaskWriterFactory(table.schema(), FlinkSchemaUtil.convert(table.schema()), table.spec(), table.locationProvider(), table.io(), table.encryption(), 128 * 1024 * 1024, - format, table.properties(), equalityFieldIds); + format, table.properties(), equalityFieldIds, false); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 8de4f8b86187..6d68d022f8f7 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -147,7 +147,7 @@ private void testChangeLogs(List equalityFieldColumns, .tableSchema(SimpleDataUtil.FLINK_SCHEMA) .writeParallelism(parallelism) .equalityFieldColumns(equalityFieldColumns) - .insertAsUpsert(insertAsUpsert) + .upsert(insertAsUpsert) .build(); // Execute the program. @@ -320,13 +320,13 @@ public void testUpsertOnIdKey() throws Exception { List> elementsPerCheckpoint = ImmutableList.of( ImmutableList.of( row("+I", 1, "aaa"), - row("+I", 1, "bbb") + row("+U", 1, "bbb") ), ImmutableList.of( row("+I", 1, "ccc") ), ImmutableList.of( - row("+I", 1, "ddd"), + row("+U", 1, "ddd"), row("+I", 1, "eee") ) ); @@ -360,13 +360,13 @@ public void testUpsertOnDataKey() throws Exception { row("+I", 3, "bbb") ), ImmutableList.of( - row("+I", 4, "aaa"), + row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb") ), ImmutableList.of( row("+I", 6, "aaa"), - row("+I", 7, "bbb") + row("+U", 7, "bbb") ) ); @@ -385,7 +385,7 @@ public void testUpsertOnIdDataKey() throws Exception { List> elementsPerCheckpoint = ImmutableList.of( ImmutableList.of( row("+I", 1, "aaa"), - row("+I", 1, "aaa"), + row("+U", 1, "aaa"), row("+I", 2, "bbb") ), ImmutableList.of( diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index a034e9068a8e..a6b46ac61ed6 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -339,7 +339,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { RowType rowType = (RowType) flinkSchema.toRowDataType().getLogicalType(); - IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, rowType, null); + IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, rowType, null, false); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 8439f7d80c41..84160773e26e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -239,7 +239,7 @@ private TaskWriter createTaskWriter(long targetFileSize) { TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), table.spec(), table.locationProvider(), table.io(), table.encryption(), - targetFileSize, format, table.properties(), null); + targetFileSize, format, table.properties(), null, false); taskWriterFactory.initialize(1, 1); return taskWriterFactory.create(); }