From a712d264e0261717f8c2c9e83f45262d4451a934 Mon Sep 17 00:00:00 2001 From: coder_wang Date: Mon, 9 Jan 2023 12:04:19 +0800 Subject: [PATCH] [HUDI-5515]Fix concurrency conflict in ClusteringOperator --- .../org/apache/hudi/sink/clustering/ClusteringOperator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 43e8a78d64a05..0a4bdf387c784 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -63,6 +63,7 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; @@ -172,6 +173,11 @@ public void processWatermark(Watermark mark) { // no need to propagate the watermark } + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) { + // no need to propagate the latencyMarker + } + @Override public void processElement(StreamRecord element) throws Exception { ClusteringPlanEvent event = element.getValue();