diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c60449a79768..0e5afa617798 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -289,6 +289,8 @@ public Pair, JavaRDD> syncOnce() throws IOException srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); } + metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis()); + // Clear persistent RDDs jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); return result; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index 056713771652..0c2f18fa4279 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -95,6 +95,12 @@ public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) { } } + public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) { + if (config.isMetricsOn()) { + Metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs); + } + } + public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; }