Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ public String branch() {
.parse();
}

public String uidSuffix() {
return confParser
.stringConf()
.option(FlinkWriteOptions.UID_SUFFIX.key())
.defaultValue(FlinkWriteOptions.UID_SUFFIX.defaultValue())
.parse();
}

public Integer writeParallelism() {
return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ private FlinkWriteOptions() {}
@Experimental
public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();

// specify the uidSuffix to be used for the underlying IcebergSink
public static final ConfigOption<String> UID_SUFFIX =
ConfigOptions.key("uid-suffix").stringType().defaultValue("");
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {

public static class Builder implements IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
@Deprecated private TableSchema tableSchema;
private ResolvedSchema resolvedSchema;
Expand Down Expand Up @@ -596,7 +595,7 @@ public Builder equalityFieldColumns(List<String> columns) {
* @return {@link Builder} to connect the iceberg table.
*/
public Builder uidSuffix(String newSuffix) {
this.uidSuffix = newSuffix;
writeOptions.put(FlinkWriteOptions.UID_SUFFIX.key(), newSuffix);
return this;
}

Expand Down Expand Up @@ -665,11 +664,12 @@ IcebergSink build() {

FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);

return new IcebergSink(
tableLoader,
table,
snapshotSummary,
uidSuffix,
flinkWriteConf.uidSuffix(),
SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table),
resolvedSchema != null
? toFlinkRowType(table.schema(), resolvedSchema)
Expand All @@ -690,7 +690,7 @@ IcebergSink build() {
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
String suffix = defaultSuffix(sink.uidSuffix, table.name());
DataStream<RowData> rowDataInput = inputCreator.apply(suffix);
// Please note that V2 sink framework will apply the uid here to the framework created
// operators like writer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,59 @@ public void testInsertIntoPartition() throws Exception {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

// Test multiple IcebergSink instances writing to the same table in separate DAG branches.
// This ensures the v2 sink can handle multiple sink operators for the same table
// without naming collisions or operator conflicts when using statement set execution.
@TestTemplate
public void testIcebergSinkDifferentDAG() throws Exception {
assumeThat(useV2Sink).isTrue();

// Disable sink reuse optimization to force creation of two separate IcebergSink instances
getTableEnv()
.getConfig()
.getConfiguration()
.setString("table.optimizer.reuse-sink-enabled", "false");

// Register the rows into a temporary table.
getTableEnv()
.createTemporaryView(
"sourceTable",
getTableEnv()
.fromValues(
SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
Expressions.row(1, "hello"),
Expressions.row(2, "world"),
Expressions.row(3, (String) null),
Expressions.row(null, "bar")));

getTableEnv()
.createTemporaryView(
"sourceTable1",
getTableEnv()
.fromValues(
SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
Expressions.row(1, "hello"),
Expressions.row(2, "world"),
Expressions.row(3, (String) null),
Expressions.row(null, "bar")));

// Redirect the records from source table to destination table.
sql(
"EXECUTE STATEMENT SET\n"
+ "BEGIN\n"
+ "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source1') */ SELECT id,data from sourceTable;\n"
+ "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source2') */ SELECT id,data from sourceTable1;\n"
+ "END;",
TABLE_NAME, TABLE_NAME);

// Assert the table records as expected.
SimpleDataUtil.assertTableRecords(
icebergTable,
Lists.newArrayList(
SimpleDataUtil.createRecord(1, "hello"),
SimpleDataUtil.createRecord(2, "world"),
SimpleDataUtil.createRecord(3, null),
SimpleDataUtil.createRecord(null, "bar")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ public String branch() {
.parse();
}

public String uidSuffix() {
return confParser
.stringConf()
.option(FlinkWriteOptions.UID_SUFFIX.key())
.defaultValue(FlinkWriteOptions.UID_SUFFIX.defaultValue())
.parse();
}

public Integer writeParallelism() {
return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ private FlinkWriteOptions() {}
@Experimental
public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();

// specify the uidSuffix to be used for the underlying IcebergSink
public static final ConfigOption<String> UID_SUFFIX =
ConfigOptions.key("uid-suffix").stringType().defaultValue("");
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {

public static class Builder implements IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
@Deprecated private TableSchema tableSchema;
private ResolvedSchema resolvedSchema;
Expand Down Expand Up @@ -597,7 +596,7 @@ public Builder equalityFieldColumns(List<String> columns) {
* @return {@link Builder} to connect the iceberg table.
*/
public Builder uidSuffix(String newSuffix) {
this.uidSuffix = newSuffix;
writeOptions.put(FlinkWriteOptions.UID_SUFFIX.key(), newSuffix);
return this;
}

Expand Down Expand Up @@ -666,11 +665,12 @@ IcebergSink build() {

FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);

return new IcebergSink(
tableLoader,
table,
snapshotSummary,
uidSuffix,
flinkWriteConf.uidSuffix(),
SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, table),
resolvedSchema != null
? toFlinkRowType(table.schema(), resolvedSchema)
Expand All @@ -691,7 +691,7 @@ IcebergSink build() {
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
String suffix = defaultSuffix(sink.uidSuffix, table.name());
DataStream<RowData> rowDataInput = inputCreator.apply(suffix);
// Please note that V2 sink framework will apply the uid here to the framework created
// operators like writer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,59 @@ public void testInsertIntoPartition() throws Exception {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}

// Test multiple IcebergSink instances writing to the same table in separate DAG branches.
// This ensures the v2 sink can handle multiple sink operators for the same table
// without naming collisions or operator conflicts when using statement set execution.
@TestTemplate
public void testIcebergSinkDifferentDAG() throws Exception {
assumeThat(useV2Sink).isTrue();

// Disable sink reuse optimization to force creation of two separate IcebergSink instances
getTableEnv()
.getConfig()
.getConfiguration()
.setString("table.optimizer.reuse-sink-enabled", "false");

// Register the rows into a temporary table.
getTableEnv()
.createTemporaryView(
"sourceTable",
getTableEnv()
.fromValues(
SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
Expressions.row(1, "hello"),
Expressions.row(2, "world"),
Expressions.row(3, (String) null),
Expressions.row(null, "bar")));

getTableEnv()
.createTemporaryView(
"sourceTable1",
getTableEnv()
.fromValues(
SimpleDataUtil.FLINK_SCHEMA.toSourceRowDataType(),
Expressions.row(1, "hello"),
Expressions.row(2, "world"),
Expressions.row(3, (String) null),
Expressions.row(null, "bar")));

// Redirect the records from source table to destination table.
sql(
"EXECUTE STATEMENT SET\n"
+ "BEGIN\n"
+ "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source1') */ SELECT id,data from sourceTable;\n"
+ "INSERT INTO %s /*+ OPTIONS('uid-suffix'='source2') */ SELECT id,data from sourceTable1;\n"
+ "END;",
TABLE_NAME, TABLE_NAME);

// Assert the table records as expected.
SimpleDataUtil.assertTableRecords(
icebergTable,
Lists.newArrayList(
SimpleDataUtil.createRecord(1, "hello"),
SimpleDataUtil.createRecord(2, "world"),
SimpleDataUtil.createRecord(3, null),
SimpleDataUtil.createRecord(null, "bar")));
}
}