-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29223][SQL][SS] Enable global timestamp per topic while specifying offset by timestamp in Kafka source #25911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -365,15 +365,16 @@ The following configurations are optional: | |
| <tr> | ||
| <td>startingOffsetsByTimestamp</td> | ||
| <td>json string | ||
| """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | ||
| """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ or """ {"topicA":{"-1": 1000}, "topicB": {"-1": 2000}} """ | ||
| </td> | ||
| <td>none (the value of <code>startingOffsets<code/> will apply)</td> | ||
| <td>none (the value of <code>startingOffsets</code> will apply)</td> | ||
| <td>streaming and batch</td> | ||
| <td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for | ||
| each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or | ||
| equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, | ||
| the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/> | ||
| <p/> | ||
| You can specify global timestamp per topic which applies to all partitions in topic, via setting partition to "-1". | ||
| Note that the option is mutually exclusive, you can't specify timestamp for both global and specific partition(s).<p/> | ||
| Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/> | ||
| For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/> | ||
| Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/> | ||
|
|
@@ -401,15 +402,16 @@ The following configurations are optional: | |
| <tr> | ||
| <td>endingOffsetsByTimestamp</td> | ||
| <td>json string | ||
| """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | ||
| """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ or """ {"topicA":{"-1": 1000}, "topicB": {"-1": 2000} """ | ||
| </td> | ||
| <td>latest</td> | ||
| <td>the value of <code>endingOffsets</code> will apply</td> | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually same meaning as NOTE 2 but in case of misunderstanding I just mentioned this again here. Please let me know if it is just OK to be latest. |
||
| <td>batch query</td> | ||
| <td>The end point when a batch query is ended, a json string specifying an ending timesamp for each TopicPartition. | ||
| <td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just fixed a nit. |
||
| The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to | ||
| the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will | ||
| be set to latest.<p/> | ||
| <p/> | ||
| You can specify global timestamp per topic which applies to all partitions in topic, via setting partition to "-1". | ||
| Note that the option is mutually exclusive, you can't specify timestamp for both global and specific partition(s).<p/> | ||
| Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/> | ||
| For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/> | ||
| Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,9 @@ private[kafka010] class KafkaOffsetReader( | |
| val driverKafkaParams: ju.Map[String, Object], | ||
| readerOptions: CaseInsensitiveMap[String], | ||
| driverGroupIdPrefix: String) extends Logging { | ||
|
|
||
| import KafkaOffsetReader._ | ||
|
|
||
| /** | ||
| * Used to ensure execute fetch operations execute in an UninterruptibleThread | ||
| */ | ||
|
|
@@ -166,83 +169,118 @@ private[kafka010] class KafkaOffsetReader( | |
| def fetchSpecificOffsets( | ||
| partitionOffsets: Map[TopicPartition, Long], | ||
| reportDataLoss: String => Unit): KafkaSourceOffset = { | ||
| val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => | ||
| assert(partitions.asScala == partitionOffsets.keySet, | ||
| "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + | ||
| "Use -1 for latest, -2 for earliest, if you don't care.\n" + | ||
| s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") | ||
| logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") | ||
| val fnRetrievePartitionOffsets: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { | ||
| case (newParams, _) => newParams | ||
| } | ||
|
|
||
| val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => | ||
| partitionOffsets | ||
| } | ||
| fetchSpecificOffsets0(partitionOffsets, adjustParamsWithPartitionsForOffsets, | ||
| fnRetrievePartitionOffsets, assertFetchedOffsetsForOffsets(reportDataLoss)) | ||
| } | ||
|
|
||
| val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => | ||
| partitionOffsets.foreach { | ||
| case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && | ||
| off != KafkaOffsetRangeLimit.EARLIEST => | ||
| if (fetched(tp) != off) { | ||
| reportDataLoss( | ||
| s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") | ||
| } | ||
| case _ => | ||
| // no real way to check that beginning or end is reasonable | ||
| } | ||
| } | ||
| private def adjustParamsWithPartitionsForOffsets | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: method values are extracted to add more code in its method definition - it becomes too long so I didn't feel good to keep them all in a method. I just left some of short method values as they were. |
||
| : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (params, partitions) => | ||
| assert(partitions.asScala == params.keySet, | ||
| "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + | ||
| "Use -1 for latest, -2 for earliest, if you don't care.\n" + | ||
| s"Specified: ${params.keySet} Assigned: ${partitions.asScala}") | ||
| logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $params") | ||
| params | ||
| } | ||
|
|
||
| fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, | ||
| fnAssertFetchedOffsets) | ||
| private def assertFetchedOffsetsForOffsets(reportDataLoss: String => Unit) | ||
| : (TPToOffsets, TPToOffsets) => Unit = { case (newParams, fetched) => | ||
| newParams.foreach { | ||
| case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && | ||
| off != KafkaOffsetRangeLimit.EARLIEST => | ||
| if (fetched(tp) != off) { | ||
| reportDataLoss( | ||
| s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") | ||
| } | ||
| case _ => | ||
| // no real way to check that beginning or end is reasonable | ||
| } | ||
| } | ||
|
|
||
| def fetchSpecificTimestampBasedOffsets( | ||
| partitionTimestamps: Map[TopicPartition, Long], | ||
| failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = { | ||
| val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => | ||
| assert(partitions.asScala == partitionTimestamps.keySet, | ||
| "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + | ||
| s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}") | ||
| logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps") | ||
| } | ||
| val fnAssertFetchedOffsets: (TPToOffsets, TPToOffsets) => Unit = { (_, _) => } | ||
|
|
||
| val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => { | ||
| val converted = partitionTimestamps.map { case (tp, timestamp) => | ||
| tp -> java.lang.Long.valueOf(timestamp) | ||
| }.asJava | ||
| fetchSpecificOffsets0(partitionTimestamps, | ||
| adjustParamsWithPartitionsForTimestampBasedOffset, | ||
| retrievePartitionOffsetsForTimestampBasedOffset(failsOnNoMatchingOffset), | ||
| fnAssertFetchedOffsets) | ||
| } | ||
|
|
||
| val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = | ||
| consumer.offsetsForTimes(converted) | ||
| private def adjustParamsWithPartitionsForTimestampBasedOffset | ||
| : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (params, partitions) => | ||
| val paramsGroupedByTopic = params.groupBy(_._1.topic()) | ||
| val partitionsGroupedByTopic = partitions.asScala.groupBy(_.topic()) | ||
|
|
||
| assert(paramsGroupedByTopic.keySet == partitionsGroupedByTopic.keySet, | ||
| s"Not all specified topics are assigned. Specified: ${params.keySet} " + | ||
| s"Assigned: ${partitions.asScala}") | ||
|
|
||
| val newParams: Map[TopicPartition, Long] = paramsGroupedByTopic.map { | ||
| case (topic, tpToOffset) => | ||
| if (tpToOffset.keySet.map(_.partition()).contains(GLOBAL_PARTITION_NUM)) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: This if statement is the only effective change. Others are mostly refactoring. |
||
| // global timestamp has been set for all partitions in topic | ||
|
|
||
| // we disallow overriding timestamp per partition for simplicity | ||
| assert(tpToOffset.size == 1, "Global timestamp for topic cannot be set along with " + | ||
| "specifying partition(s). Specify only global timestamp for each topic if you want " + | ||
| s"to use global timestamp. Configuration error on topic $topic, specified: $params") | ||
|
|
||
| val partsForTopic = partitionsGroupedByTopic(topic) | ||
| val timestampOffset = tpToOffset(new TopicPartition(topic, GLOBAL_PARTITION_NUM)) | ||
|
|
||
| partsForTopic.map { part => part -> timestampOffset }.toMap | ||
| } else { | ||
| val partsForTopic = partitionsGroupedByTopic(topic) | ||
| assert(tpToOffset.keySet == partsForTopic.toSet, | ||
| "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify" + | ||
| s" all TopicPartitions. Specified: ${params.keySet} Assigned: " + | ||
| s"${partitions.asScala}") | ||
| tpToOffset | ||
| } | ||
| }.reduceLeft { (acc: TPToOffsets, tpToOffsets: TPToOffsets) => acc ++ tpToOffsets } | ||
|
|
||
| offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => | ||
| if (failsOnNoMatchingOffset) { | ||
| assert(offsetAndTimestamp != null, "No offset matched from request of " + | ||
| s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.") | ||
| } | ||
| logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $newParams") | ||
| newParams | ||
| } | ||
|
|
||
| if (offsetAndTimestamp == null) { | ||
| tp -> KafkaOffsetRangeLimit.LATEST | ||
| } else { | ||
| tp -> offsetAndTimestamp.offset() | ||
| } | ||
| }.toMap | ||
| } | ||
| } | ||
| private def retrievePartitionOffsetsForTimestampBasedOffset(failsOnNoMatchingOffset: Boolean) | ||
| : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (newPartitionTimestamps, _) => | ||
| val converted = newPartitionTimestamps.map { case (tp, timestamp) => | ||
| tp -> java.lang.Long.valueOf(timestamp) | ||
| }.asJava | ||
|
|
||
| val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => } | ||
| val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = | ||
| consumer.offsetsForTimes(converted) | ||
|
|
||
| fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, | ||
| fnAssertFetchedOffsets) | ||
| offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => | ||
| if (failsOnNoMatchingOffset) { | ||
| assert(offsetAndTimestamp != null, "No offset matched from request of " + | ||
| s"topic-partition $tp and timestamp ${newPartitionTimestamps(tp)}.") | ||
| } | ||
|
|
||
| if (offsetAndTimestamp == null) { | ||
| tp -> KafkaOffsetRangeLimit.LATEST | ||
| } else { | ||
| tp -> offsetAndTimestamp.offset() | ||
| } | ||
| }.toMap | ||
| } | ||
|
|
||
| private def fetchSpecificOffsets0( | ||
| fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit, | ||
| fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long], | ||
| fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit): KafkaSourceOffset = { | ||
| partitionTimestamps: TPToOffsets, | ||
| fnAdjustParamsWithPartitions: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets, | ||
| fnRetrievePartitionOffsets: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets, | ||
| fnAssertFetchedOffsets: (TPToOffsets, TPToOffsets) => Unit): KafkaSourceOffset = { | ||
| val fetched = partitionsAssignedToConsumer { | ||
| partitions => { | ||
| fnAssertParametersWithPartitions(partitions) | ||
|
|
||
| val partitionOffsets = fnRetrievePartitionOffsets(partitions) | ||
| val newPartTimestamps = fnAdjustParamsWithPartitions(partitionTimestamps, partitions) | ||
| val partitionOffsets = fnRetrievePartitionOffsets(newPartTimestamps, partitions) | ||
|
|
||
| partitionOffsets.foreach { | ||
| case (tp, KafkaOffsetRangeLimit.LATEST) => | ||
|
|
@@ -252,14 +290,16 @@ private[kafka010] class KafkaOffsetReader( | |
| case (tp, off) => consumer.seek(tp, off) | ||
| } | ||
|
|
||
| partitionOffsets.map { | ||
| val fetchedOffsets = partitionOffsets.map { | ||
| case (tp, _) => tp -> consumer.position(tp) | ||
| } | ||
|
|
||
| fnAssertFetchedOffsets(newPartTimestamps, fetchedOffsets) | ||
|
|
||
| fetchedOffsets | ||
| } | ||
| } | ||
|
|
||
| fnAssertFetchedOffsets(fetched) | ||
|
|
||
| KafkaSourceOffset(fetched) | ||
| } | ||
|
|
||
|
|
@@ -478,3 +518,11 @@ private[kafka010] class KafkaOffsetReader( | |
| _consumer = null // will automatically get reinitialized again | ||
| } | ||
| } | ||
|
|
||
| private[kafka010] object KafkaOffsetReader { | ||
| // The type is to shorten the type to reduce characters in line: please avoid exposing | ||
| // this type to outside of KafkaOffsetReader - parameter/return type of public methods | ||
| private type TPToOffsets = Map[TopicPartition, Long] | ||
|
|
||
| val GLOBAL_PARTITION_NUM = -1 | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just fixed a nit which IDEA warns about.