-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24863][SS] Report Kafka offset lag as a custom metrics #21819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #93296 has finished for PR 21819 at commit
|
c1fc3ca to
7129c3f
Compare
|
@HeartSaVioR @HyukjinKwon @jose-torres @tdas would you mind taking a look? |
|
Test build #94582 has finished for PR 21819 at commit
|
| * Write per-topic partition lag as json string | ||
| */ | ||
| def partitionLags(latestOffsets: Map[TopicPartition, Long], | ||
| processedOffsets: Map[TopicPartition, Long]): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def partitionLags(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {per https://github.com/databricks/scala-style-guide#spacing-and-indentation
Please feel free to address it with other comments later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed. would it be possible to add this to scala style checks ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please go ahead if possible.
| */ | ||
| def partitionLags(latestOffsets: Map[TopicPartition, Long], | ||
| processedOffsets: Map[TopicPartition, Long]): String = { | ||
| val result = new HashMap[String, HashMap[Int, Long]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: HashMap.empty[String, HashMap[Int, Long]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had followed the style in other parts of the class. Addressed and refactored the other places as well.
|
Test build #94699 has finished for PR 21819 at commit
|
|
retest this please |
|
cc @koeninger as well |
|
Test build #94726 has finished for PR 21819 at commit
|
|
@HyukjinKwon , can you take it forward? Appreciate your effort and thanks in advance. |
|
Let me leave this in few days in case someone has more comments on this. |
|
Merged to master. |
## What changes were proposed in this pull request? Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out. ## How was this patch tested? Jenkins Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source.
This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application.
How was this patch tested?
Existing and new unit tests
Please review http://spark.apache.org/contributing.html before opening a pull request.