diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 222a1e810468..66fd09807794 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -208,6 +208,14 @@ public String branch() { .parse(); } + public String uidSuffix() { + return confParser + .stringConf() + .option(FlinkWriteOptions.UID_SUFFIX.key()) + .defaultValue(FlinkWriteOptions.UID_SUFFIX.defaultValue()) + .parse(); + } + public Integer writeParallelism() { return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional(); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 6bdb01c3f5d3..e68e64ac573d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -87,4 +87,8 @@ private FlinkWriteOptions() {} @Experimental public static final ConfigOption TABLE_REFRESH_INTERVAL = ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue(); + + // specify the uidSuffix to be used for the underlying IcebergSink + public static final ConfigOption UID_SUFFIX = + ConfigOptions.key("uid-suffix").stringType().defaultValue(""); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index f2d3e1fe44a4..f27db4f2b4cf 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -320,7 +320,6 @@ public SimpleVersionedSerializer getWriteResultSerializer() { public static class Builder implements IcebergSinkBuilder { private TableLoader tableLoader; - private String uidSuffix = ""; private Function> inputCreator = null; @Deprecated private TableSchema tableSchema; private ResolvedSchema resolvedSchema; @@ -596,7 +595,7 @@ public Builder equalityFieldColumns(List columns) { * @return {@link Builder} to connect the iceberg table. */ public Builder uidSuffix(String newSuffix) { - this.uidSuffix = newSuffix; + writeOptions.put(FlinkWriteOptions.UID_SUFFIX.key(), newSuffix); return this; } @@ -665,11 +664,12 @@ IcebergSink build() { FlinkMaintenanceConfig flinkMaintenanceConfig = new FlinkMaintenanceConfig(table, writeOptions, readableConfig); + return new IcebergSink( tableLoader, table, snapshotSummary, - uidSuffix, + flinkWriteConf.uidSuffix(), SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table), resolvedSchema != null ? toFlinkRowType(table.schema(), resolvedSchema) @@ -690,7 +690,7 @@ IcebergSink build() { @Override public DataStreamSink append() { IcebergSink sink = build(); - String suffix = defaultSuffix(uidSuffix, table.name()); + String suffix = defaultSuffix(sink.uidSuffix, table.name()); DataStream rowDataInput = inputCreator.apply(suffix); // Please note that V2 sink framework will apply the uid here to the framework created // operators like writer, diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index d99f657a11cc..572ff68202fb 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -263,4 +263,59 @@ public void testInsertIntoPartition() throws Exception { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } } + + // Test multiple IcebergSink instances writing to the same table in separate DAG branches. + // This ensures the v2 sink can handle multiple sink operators for the same table + // without naming collisions or operator conflicts when using statement set execution. + @TestTemplate + public void testIcebergSinkDifferentDAG() throws Exception { + assumeThat(useV2Sink).isTrue(); + + // Disable sink reuse optimization to force creation of two separate IcebergSink instances + getTableEnv() + .getConfig() + .getConfiguration() + .setString("table.optimizer.reuse-sink-enabled", "false"); + + // Register the rows into a temporary table. + getTableEnv() + .createTemporaryView( + "sourceTable", + getTableEnv() + .fromValues( + SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(), + Expressions.row(1, "hello"), + Expressions.row(2, "world"), + Expressions.row(3, (String) null), + Expressions.row(null, "bar"))); + + getTableEnv() + .createTemporaryView( + "sourceTable1", + getTableEnv() + .fromValues( + SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(), + Expressions.row(1, "hello"), + Expressions.row(2, "world"), + Expressions.row(3, (String) null), + Expressions.row(null, "bar"))); + + // Redirect the records from source table to destination table. + sql( + "EXECUTE STATEMENT SET\n" + + "BEGIN\n" + + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source1') */ SELECT id,data from sourceTable;\n" + + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source2') */ SELECT id,data from sourceTable1;\n" + + "END;", + TABLE_NAME, TABLE_NAME); + + // Assert the table records as expected. + SimpleDataUtil.assertTableRecords( + icebergTable, + Lists.newArrayList( + SimpleDataUtil.createRecord(1, "hello"), + SimpleDataUtil.createRecord(2, "world"), + SimpleDataUtil.createRecord(3, null), + SimpleDataUtil.createRecord(null, "bar"))); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 222a1e810468..66fd09807794 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -208,6 +208,14 @@ public String branch() { .parse(); } + public String uidSuffix() { + return confParser + .stringConf() + .option(FlinkWriteOptions.UID_SUFFIX.key()) + .defaultValue(FlinkWriteOptions.UID_SUFFIX.defaultValue()) + .parse(); + } + public Integer writeParallelism() { return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional(); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 6bdb01c3f5d3..e68e64ac573d 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -87,4 +87,8 @@ private FlinkWriteOptions() {} @Experimental public static final ConfigOption TABLE_REFRESH_INTERVAL = ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue(); + + // specify the uidSuffix to be used for the underlying IcebergSink + public static final ConfigOption UID_SUFFIX = + ConfigOptions.key("uid-suffix").stringType().defaultValue(""); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 752882a9d6c2..593230262fde 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -321,7 +321,6 @@ public SimpleVersionedSerializer getWriteResultSerializer() { public static class Builder implements IcebergSinkBuilder { private TableLoader tableLoader; - private String uidSuffix = ""; private Function> inputCreator = null; @Deprecated private TableSchema tableSchema; private ResolvedSchema resolvedSchema; @@ -597,7 +596,7 @@ public Builder equalityFieldColumns(List columns) { * @return {@link Builder} to connect the iceberg table. */ public Builder uidSuffix(String newSuffix) { - this.uidSuffix = newSuffix; + writeOptions.put(FlinkWriteOptions.UID_SUFFIX.key(), newSuffix); return this; } @@ -666,11 +665,12 @@ IcebergSink build() { FlinkMaintenanceConfig flinkMaintenanceConfig = new FlinkMaintenanceConfig(table, writeOptions, readableConfig); + return new IcebergSink( tableLoader, table, snapshotSummary, - uidSuffix, + flinkWriteConf.uidSuffix(), SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table), resolvedSchema != null ? toFlinkRowType(table.schema(), resolvedSchema) @@ -691,7 +691,7 @@ IcebergSink build() { @Override public DataStreamSink append() { IcebergSink sink = build(); - String suffix = defaultSuffix(uidSuffix, table.name()); + String suffix = defaultSuffix(sink.uidSuffix, table.name()); DataStream rowDataInput = inputCreator.apply(suffix); // Please note that V2 sink framework will apply the uid here to the framework created // operators like writer, diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index d99f657a11cc..572ff68202fb 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -263,4 +263,59 @@ public void testInsertIntoPartition() throws Exception { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); } } + + // Test multiple IcebergSink instances writing to the same table in separate DAG branches. + // This ensures the v2 sink can handle multiple sink operators for the same table + // without naming collisions or operator conflicts when using statement set execution. + @TestTemplate + public void testIcebergSinkDifferentDAG() throws Exception { + assumeThat(useV2Sink).isTrue(); + + // Disable sink reuse optimization to force creation of two separate IcebergSink instances + getTableEnv() + .getConfig() + .getConfiguration() + .setString("table.optimizer.reuse-sink-enabled", "false"); + + // Register the rows into a temporary table. + getTableEnv() + .createTemporaryView( + "sourceTable", + getTableEnv() + .fromValues( + SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(), + Expressions.row(1, "hello"), + Expressions.row(2, "world"), + Expressions.row(3, (String) null), + Expressions.row(null, "bar"))); + + getTableEnv() + .createTemporaryView( + "sourceTable1", + getTableEnv() + .fromValues( + SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(), + Expressions.row(1, "hello"), + Expressions.row(2, "world"), + Expressions.row(3, (String) null), + Expressions.row(null, "bar"))); + + // Redirect the records from source table to destination table. + sql( + "EXECUTE STATEMENT SET\n" + + "BEGIN\n" + + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source1') */ SELECT id,data from sourceTable;\n" + + "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source2') */ SELECT id,data from sourceTable1;\n" + + "END;", + TABLE_NAME, TABLE_NAME); + + // Assert the table records as expected. + SimpleDataUtil.assertTableRecords( + icebergTable, + Lists.newArrayList( + SimpleDataUtil.createRecord(1, "hello"), + SimpleDataUtil.createRecord(2, "world"), + SimpleDataUtil.createRecord(3, null), + SimpleDataUtil.createRecord(null, "bar"))); + } }