-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24647][SS] Report KafkaStreamWriter's written min and max offs… #22143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24647][SS] Report KafkaStreamWriter's written min and max offs… #22143
Conversation
|
@arunmahadevan @jose-torres @cloud-fan you may interested in this one. |
|
Jenkins, ok to test |
|
Test build #94937 has finished for PR 22143 at commit
|
…ets via CustomMetrics.
671bc81 to
c812eff
Compare
|
Test build #94941 has finished for PR 22143 at commit
|
arunmahadevan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through at a high level and left a few comments.
Seems we want to report something like below as sink metrics.
"minOffset" : {
"topic1" : {
"0" : 44, "1" : 95
}
},
"maxOffset" : {
"topic1" : {
"0" : 50, "1": 100
}
Before reviewing further would like to understand how is this useful since we were already planning to report the number of output rows in the sink metrics (i.e. what use cases can be solved by directly exposing the kafka offsets in addition to numoutputrows).
Also if we should report both max and min (assume the max of the current micro-batch would be the min for the next micro-batch)
| * don't need to really send one. | ||
| */ | ||
| case object KafkaWriterCommitMessage extends WriterCommitMessage | ||
| case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its kind of odd that the writer commit message includes source offset. IMO, better to define a KafkaSinkOffset or if it can be common, something like KafkaOffsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have to rename the class itself to not add additional duplicate class. I would love to do that, it is just that I am not sure if it would be accepted.
| KafkaWriterCustomMetrics(minMax._1, minMax._2) | ||
| } | ||
|
|
||
| private def collate(messages: Array[WriterCommitMessage]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to leave some comment on what this does. It seems to be computing the min/max offset per partition? If so choosing an apt name for that function would make it clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will rename to something with minMax.
| import scala.collection.JavaConverters._ | ||
|
|
||
| protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] = | ||
| new ConcurrentHashMap[TopicPartition, Long]().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this concurrent map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map is accessed in callbacks concurrently with respect to different partitions. Can be seen from call hierarchy and docs of Kafka's send method.
|
@arunmahadevan min and max are used there can be other writers to same topic occurring in different job. The messages sent would then become interleaved and one would have to return large number of intervals to be accurate. This approach gives sufficient information where the data ended up being written, while being also resilient and simplistic. Would you recommend adding this as a Java Doc? To explain motivation I updated description of this PR using description of the Jira. (To track data lineage we need to know where data was read from and written to at least approaximately.) |
|
@cloud-fan are you ok merging the PR? |
|
Currently this PR is awating for SPARK-24748 to be remerged again since it was reverted until Datasource API v2 is finished. |
|
Can one of the admins verify this patch? |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
…ets via CustomMetrics.
What changes were proposed in this pull request?
Report KafkaStreamWriter's written min and max offsets via CustomMetrics. This is important for data lineage projects like Spline. Related issue: https://issues.apache.org/jira/browse/SPARK-24647
To be able to track data lineage for Structured Streaming (I intend to implement this to Open Source Project Spline), the monitoring needs to be able to not only to track where the data was read from but also where results were written to. This could be to my knowledge best implemented using monitoring StreamingQueryProgress. However currently written data offsets are not available on Sink or StreamWriter interface. Implementing as proposed would also bring symmetry to StreamingQueryProgress fields sources and sink.
How was this patch tested?
Unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.