Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext);
}

metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming that you don't want to emit a metric if there was an exception and it errored out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to not update the metrics if an error occurred. Personally i just need a metric that also updates when a sync happens but no commit because of missing new data.
As i understand this patch is wrong as it also updates the metrics if something fails, am i right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If something fails an exception should be raised from this function and the metrics.updateDeltaStreamerSyncMetrics(...) function wont be called.

  1. If the sync runs but fails due to exception - no metric would be published.
  2. If the sync runs but there is no data to sync - metric will be published.

If lastSync is not updating then the process failed with an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This patch is okay as long as you can set your alerts to catch - no sync metrics being emitted for a period of time.

Another way to monitor would be to emit a 1 for success and 0 for failure. And it lets you set alerts on a rolling window aggregate.

I have done both personally. Cant say one is better over other by a lot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback! If you can confirm that an error throws an exception it works as i intended.
Currently we monitor the commitTime metric. The problem being that we cannot distinguish between ingestion stopped working and there are simply no records to ingest.


// Clear persistent RDDs
jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public void updateDeltaStreamerKafkaDelayCountMetrics(long kafkaDelayCount) {
}
}

public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) {
if (config.isMetricsOn()) {
Metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs);
}
}

public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
Expand Down