diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index d361179a1db15..2475e92f9607f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -101,6 +101,12 @@ public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) { } } + public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) { + if (config.isMetricsOn()) { + Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount); + } + } + public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 6f2377fc7ce93..5561356cabc76 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -57,8 +57,10 @@ protected InputBatch> fetchNewData(Option lastCheckpointStr, long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { + metrics.updateDeltaStreamerKafkaMessageInCount(0); return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } + metrics.updateDeltaStreamerKafkaMessageInCount(totalNewMsgs); JavaRDD newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } catch (org.apache.kafka.common.errors.TimeoutException e) {