diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java index 65f70ad6aaf0f..48bc4fbafa0e9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.runtime.operators.TableStreamOperator; import org.apache.flink.table.runtime.util.StreamRecordCollector; @@ -102,6 +103,11 @@ public void processWatermark(Watermark mark) { // no need to propagate the watermark } + @Override + public void processLatencyMarker(LatencyMarker latencyMarker) { + // no need to propagate the latencyMarker + } + @Override public void processElement(StreamRecord record) throws Exception { final CompactionPlanEvent event = record.getValue();