diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index fe19a2a850f0..80a9e2f9459d 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -222,4 +222,7 @@ private TableProperties() { public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled"; public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true; + + public static final String UPSERT_MODE_ENABLE = "write.upsert.enable"; + public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 10dab416091c..8415129db9a7 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -41,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; private final Schema deleteSchema; private final RowDataWrapper wrapper; + private final boolean upsert; BaseDeltaTaskWriter(PartitionSpec spec, FileFormat format, @@ -50,11 +51,13 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List equalityFieldIds) { + List equalityFieldIds, + boolean upsert) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.schema = schema; this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + this.upsert = upsert; } abstract RowDataDeltaWriter route(RowData row); @@ -70,11 +73,19 @@ public void write(RowData row) throws IOException { switch (row.getRowKind()) { case INSERT: case UPDATE_AFTER: + if (upsert) { + writer.delete(row); + } writer.write(row); break; - case DELETE: case UPDATE_BEFORE: + if (upsert) { + break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice + } + writer.delete(row); + break; + case DELETE: writer.delete(row); break; diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index f98a1bea259f..fafbd4f88f08 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -41,6 +41,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SerializableTable; @@ -58,6 +59,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE; +import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; @@ -125,6 +128,7 @@ public static class Builder { private boolean overwrite = false; private DistributionMode distributionMode = null; private Integer writeParallelism = null; + private boolean upsert = false; private List equalityFieldColumns = null; private String uidPrefix = null; @@ -212,6 +216,20 @@ public Builder writeParallelism(int newWriteParallelism) { return this; } + /** + * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will + * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be + * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the + * new row that located in partition-B. + * + * @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder upsert(boolean enable) { + this.upsert = enable; + return this; + } + /** * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events. * @@ -321,7 +339,27 @@ private SingleOutputStreamOperator appendWriter(DataStream equalityFieldIds.add(field.fieldId()); } } - IcebergStreamWriter streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds); + + // Fallback to use upsert mode parsed from table properties if don't specify in job level. + boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(), + UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT); + + // Validate the equality fields and partition fields if we enable the upsert mode. + if (upsertMode) { + Preconditions.checkState(!overwrite, + "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); + Preconditions.checkState(!equalityFieldIds.isEmpty(), + "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); + if (!table.spec().isUnpartitioned()) { + for (PartitionField partitionField : table.spec().fields()) { + Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()), + "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", + partitionField, equalityFieldColumns); + } + } + } + + IcebergStreamWriter streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator writerStream = input @@ -390,7 +428,9 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { static IcebergStreamWriter createStreamWriter(Table table, RowType flinkRowType, - List equalityFieldIds) { + List equalityFieldIds, + boolean upsert) { + Preconditions.checkArgument(table != null, "Iceberg table should't be null"); Map props = table.properties(); long targetFileSize = getTargetFileSizeBytes(props); FileFormat fileFormat = getFileFormat(props); @@ -398,7 +438,7 @@ static IcebergStreamWriter createStreamWriter(Table table, Table serializableTable = SerializableTable.copyOf(table); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( serializableTable, flinkRowType, targetFileSize, - fileFormat, equalityFieldIds); + fileFormat, equalityFieldIds, upsert); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index b2f8ceece9f8..1eee6298e933 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -49,8 +49,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds); + List equalityFieldIds, + boolean upsert) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, + upsert); this.partitionKey = new PartitionKey(spec, schema); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index e94f99feb66f..2849100858a1 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -46,6 +46,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final long targetFileSizeBytes; private final FileFormat format; private final List equalityFieldIds; + private final boolean upsert; private final FileAppenderFactory appenderFactory; private transient OutputFileFactory outputFileFactory; @@ -54,7 +55,8 @@ public RowDataTaskWriterFactory(Table table, RowType flinkSchema, long targetFileSizeBytes, FileFormat format, - List equalityFieldIds) { + List equalityFieldIds, + boolean upsert) { this.table = table; this.schema = table.schema(); this.flinkSchema = flinkSchema; @@ -63,6 +65,7 @@ public RowDataTaskWriterFactory(Table table, this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; this.equalityFieldIds = equalityFieldIds; + this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); @@ -95,10 +98,10 @@ public TaskWriter create() { // Initialize a task writer to write both INSERT and equality DELETE. if (spec.isUnpartitioned()) { return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds); + targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); } else { return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds); + targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java index 341e634df713..331ed7c78192 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java @@ -41,8 +41,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { long targetFileSize, Schema schema, RowType flinkSchema, - List equalityFieldIds) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds); + List equalityFieldIds, + boolean upsert) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, + upsert); this.writer = new RowDataDeltaWriter(null); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index a6cd374c3044..5e1d6ceea066 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -77,7 +77,8 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption flinkSchema, Long.MAX_VALUE, format, - null); + null, + false); } public List rewriteDataForTasks(DataStream dataStream, int parallelism) throws Exception { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index bdda3fd0d3cd..71978fd8c453 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -334,6 +334,6 @@ private StructLikeSet actualRowSet(String... columns) throws IOException { private TaskWriterFactory createTaskWriterFactory(List equalityFieldIds) { return new RowDataTaskWriterFactory( SerializableTable.copyOf(table), FlinkSchemaUtil.convert(table.schema()), - 128 * 1024 * 1024, format, equalityFieldIds); + 128 * 1024 * 1024, format, equalityFieldIds, false); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index fd2a71ab2978..90f662c6e427 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -32,6 +32,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; @@ -144,6 +145,7 @@ private List findValidSnapshots(Table table) { private void testChangeLogs(List equalityFieldColumns, KeySelector keySelector, + boolean insertAsUpsert, List> elementsPerCheckpoint, List> expectedRecordsPerCheckpoint) throws Exception { DataStream dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); @@ -157,6 +159,7 @@ private void testChangeLogs(List equalityFieldColumns, .tableSchema(SimpleDataUtil.FLINK_SCHEMA) .writeParallelism(parallelism) .equalityFieldColumns(equalityFieldColumns) + .upsert(insertAsUpsert) .build(); // Execute the program. @@ -219,7 +222,8 @@ public void testChangeLogOnIdKey() throws Exception { ImmutableList.of(record(1, "ddd"), record(2, "ddd")) ); - testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), elementsPerCheckpoint, expectedRecords); + testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false, + elementsPerCheckpoint, expectedRecords); } @Test @@ -250,7 +254,8 @@ public void testChangeLogOnDataKey() throws Exception { ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")) ); - testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), elementsPerCheckpoint, expectedRecords); + testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), false, + elementsPerCheckpoint, expectedRecords); } @Test @@ -281,7 +286,7 @@ public void testChangeLogOnIdDataKey() throws Exception { ); testChangeLogs(ImmutableList.of("data", "id"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), - elementsPerCheckpoint, expectedRecords); + false, elementsPerCheckpoint, expectedRecords); } @Test @@ -319,9 +324,124 @@ public void testChangeLogOnSameKey() throws Exception { ); testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, elementsPerCheckpoint, expectedRecords); + } + + @Test + public void testUpsertModeCheck() throws Exception { + DataStream dataStream = env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO); + FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .tableLoader(tableLoader) + .tableSchema(SimpleDataUtil.FLINK_SCHEMA) + .writeParallelism(parallelism) + .upsert(true); + + AssertHelpers.assertThrows("Should be error because upsert mode and overwrite mode enable at the same time.", + IllegalStateException.class, "OVERWRITE mode shouldn't be enable", + () -> builder.equalityFieldColumns(ImmutableList.of("id")).overwrite(true).build() + ); + + AssertHelpers.assertThrows("Should be error because equality field columns are empty.", + IllegalStateException.class, "Equality field columns shouldn't be empty", + () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).build() + ); + } + + @Test + public void testUpsertOnIdKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+U", 1, "bbb") + ), + ImmutableList.of( + row("+I", 1, "ccc") + ), + ImmutableList.of( + row("+U", 1, "ddd"), + row("+I", 1, "eee") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(1, "bbb")), + ImmutableList.of(record(1, "ccc")), + ImmutableList.of(record(1, "eee")) + ); + + if (!partitioned) { + testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, + elementsPerCheckpoint, expectedRecords); + } else { + AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys", + IllegalStateException.class, "should be included in equality fields", + () -> { + testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint, + expectedRecords); + return null; + }); + } + } + + @Test + public void testUpsertOnDataKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+I", 2, "aaa"), + row("+I", 3, "bbb") + ), + ImmutableList.of( + row("+U", 4, "aaa"), + row("-U", 3, "bbb"), + row("+U", 5, "bbb") + ), + ImmutableList.of( + row("+I", 6, "aaa"), + row("+U", 7, "bbb") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(2, "aaa"), record(3, "bbb")), + ImmutableList.of(record(4, "aaa"), record(5, "bbb")), + ImmutableList.of(record(6, "aaa"), record(7, "bbb")) + ); + + testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), true, elementsPerCheckpoint, expectedRecords); } + @Test + public void testUpsertOnIdDataKey() throws Exception { + List> elementsPerCheckpoint = ImmutableList.of( + ImmutableList.of( + row("+I", 1, "aaa"), + row("+U", 1, "aaa"), + row("+I", 2, "bbb") + ), + ImmutableList.of( + row("+I", 1, "aaa"), + row("-D", 2, "bbb"), + row("+I", 2, "ccc") + ), + ImmutableList.of( + row("+U", 1, "bbb"), + row("-U", 1, "ccc"), + row("-D", 1, "aaa") + ) + ); + + List> expectedRecords = ImmutableList.of( + ImmutableList.of(record(1, "aaa"), record(2, "bbb")), + ImmutableList.of(record(1, "aaa"), record(2, "ccc")), + ImmutableList.of(record(1, "bbb"), record(2, "ccc")) + ); + + testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + true, elementsPerCheckpoint, expectedRecords); + } + private StructLikeSet expectedRowSet(Record... records) { return SimpleDataUtil.expectedRowSet(table, records); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index e5ffd01042c3..741977541c08 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -340,7 +340,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr private OneInputStreamOperatorTestHarness createIcebergStreamWriter( Table icebergTable, TableSchema flinkSchema) throws Exception { RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema); - IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null); + IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null, false); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>( streamWriter, 1, 1, 0); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 9eee57fcf286..562a75e53773 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -239,7 +239,7 @@ public void testRandomData() throws IOException { private TaskWriter createTaskWriter(long targetFileSize) { TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( SerializableTable.copyOf(table), (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), - targetFileSize, format, null); + targetFileSize, format, null, false); taskWriterFactory.initialize(1, 1); return taskWriterFactory.create(); }