diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 7f0285e83ed2a..c37b2325ca76a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.sink.common.AbstractStreamWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.table.action.commit.FlinkWriteHelper; @@ -53,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Properties; import java.util.Random; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -431,10 +429,8 @@ private boolean flushBucket(DataBucket bucket) { List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - Properties props = new Properties(); - config.addAllToProperties(props); records = (List) FlinkWriteHelper.newInstance() - .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger); + .deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger); } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); @@ -469,10 +465,8 @@ private void flushRemaining(boolean endInput) { List records = bucket.writeBuffer(); if (records.size() > 0) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - Properties props = new Properties(); - config.addAllToProperties(props); records = (List) FlinkWriteHelper.newInstance() - .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger); + .deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger); } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant));