From f8da5763e76d77171f1169fba58710b42251b4b4 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 20 May 2021 18:21:09 +0900 Subject: [PATCH 1/5] [SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source --- .../structured-streaming-kafka-integration.md | 37 +++- .../sql/kafka010/KafkaContinuousStream.scala | 2 + .../sql/kafka010/KafkaMicroBatchStream.scala | 2 + .../sql/kafka010/KafkaOffsetRangeLimit.scala | 7 + .../sql/kafka010/KafkaOffsetReader.scala | 13 ++ .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 34 ++++ .../kafka010/KafkaOffsetReaderConsumer.scala | 36 ++++ .../spark/sql/kafka010/KafkaSource.scala | 2 + .../sql/kafka010/KafkaSourceProvider.scala | 87 ++++++--- .../kafka010/KafkaMicroBatchSourceSuite.scala | 181 +++++++++++++++--- .../sql/kafka010/KafkaRelationSuite.scala | 34 ++++ 11 files changed, 374 insertions(+), 61 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 0411b3076b55..492e3b256ea2 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -362,6 +362,25 @@ The following configurations are optional: + + + + + + + + + + + + + + - + @@ -389,16 +384,11 @@ The following configurations are optional: @@ -423,14 +413,9 @@ The following configurations are optional: @@ -441,15 +426,9 @@ The following configurations are optional: @@ -547,6 +526,17 @@ The following configurations are optional:
Optionvaluedefaultquery typemeaning
startingTimestamptimestamp string e.g. "1000"none (next preference is startingOffsetsByTimestamp)streaming and batchThe start point of timestamp when a query is started, a string specifying a starting timestamp for + all partitions in topics being subscribed. The returned offset for each partition is the earliest offset whose timestamp is greater than or + equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, + the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)

+

+ Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

+ For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

+ Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

+ Note: This option requires Kafka 0.10.1.0 or higher.

+ Note2: startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

+ Note3: For streaming queries, this only applies when a new query is started, and that resuming will + always pick up from where the query left off. Newly discovered partitions during a query will start at + earliest.

startingOffsetsByTimestamp json string @@ -398,12 +417,28 @@ The following configurations are optional: always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
endingTimestamptimestamp string e.g. "1000"none (next preference is endingOffsetsByTimestamp)batch queryThe end point when a batch query is ended, a json string specifying an ending timestamp for + all partitions in topics being subscribed. The returned offset for each partition is the earliest offset whose timestamp is greater than or + equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will be set to latest.

+

+ Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

+ For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

+ Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

+ Note: This option requires Kafka 0.10.1.0 or higher.

+ Note2: endingTimestamp takes precedence over endingOffsetsByTimestamp and endingOffsets.

+

endingOffsetsByTimestamp json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ latestnone (next preference is endingOffsets) batch query The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 0b549870a348..abdb322156f2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -72,6 +72,8 @@ class KafkaContinuousStream( case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) case SpecificTimestampRangeLimit(p) => offsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true) + case GlobalTimestampRangeLimit(ts) => offsetReader.fetchGlobalTimestampBasedOffsets( + ts, failsOnNoMatchingOffset = true) } logInfo(s"Initial offsets: $offsets") offsets diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 822184842deb..cdd8aa8d8acc 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -166,6 +166,8 @@ private[kafka010] class KafkaMicroBatchStream( kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) case SpecificTimestampRangeLimit(p) => kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true) + case GlobalTimestampRangeLimit(ts) => + kafkaOffsetReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala index d64b5d4f7e9e..5c9a01d6d1db 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala @@ -49,6 +49,13 @@ private[kafka010] case class SpecificOffsetRangeLimit( private[kafka010] case class SpecificTimestampRangeLimit( topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit +/** + * Represents the desire to bind to earliest offset which timestamp for the offset is equal or + * greater than specific timestamp. This applies the timestamp to the all topics/partitions. + */ +private[kafka010] case class GlobalTimestampRangeLimit( + timestamp: Long) extends KafkaOffsetRangeLimit + private[kafka010] object KafkaOffsetRangeLimit { /** * Used to denote offset range limits that are resolved via Kafka diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 546970507a2e..9c957d3bff7f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -77,6 +77,19 @@ private[kafka010] trait KafkaOffsetReader { partitionTimestamps: Map[TopicPartition, Long], failsOnNoMatchingOffset: Boolean): KafkaSourceOffset + /** + * Resolves the specific offsets based on timestamp per all topic-partitions being subscribed. + * The returned offset for each partition is the earliest offset whose timestamp is greater + * than or equal to the given timestamp in the corresponding partition. If the matched offset + * doesn't exist, depending on `failsOnNoMatchingOffset` parameter, the offset will be set to + * latest or this method throws an error. + * + * @param timestamp the timestamp. + * @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found. + */ + def fetchGlobalTimestampBasedOffsets(timestamp: Long, + failsOnNoMatchingOffset: Boolean): KafkaSourceOffset + /** * Fetch the earliest offsets for the topic partitions that are indicated * in the [[ConsumerStrategy]]. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 6f4cb895f363..ceca011830ad 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -138,6 +138,9 @@ private[kafka010] class KafkaOffsetReaderAdmin( case SpecificTimestampRangeLimit(partitionTimestamps) => fetchSpecificTimestampBasedOffsets(partitionTimestamps, failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets + case GlobalTimestampRangeLimit(timestamp) => + fetchGlobalTimestampBasedOffsets(timestamp, + failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets } } @@ -193,6 +196,37 @@ private[kafka010] class KafkaOffsetReaderAdmin( fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets) } + + override def fetchGlobalTimestampBasedOffsets( + timestamp: Long, + failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = { + val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp") + } + + val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { tps => + val listOffsetsParams = tps.asScala.map { tp => + tp -> OffsetSpec.forTimestamp(timestamp) + }.toMap.asJava + admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.map { + case (tp, offsetSpec) => + if (failsOnNoMatchingOffset) { + assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " + + s"matched from request of topic-partition $tp and timestamp " + + s"$timestamp.") + } + + if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) { + tp -> KafkaOffsetRangeLimit.LATEST + } else { + tp -> offsetSpec.offset() + } + }.toMap + } + + fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets) + } + private def fetchSpecificOffsets0( fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit, fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index ead819e4c27a..0733e6d76157 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -160,6 +160,9 @@ private[kafka010] class KafkaOffsetReaderConsumer( case SpecificTimestampRangeLimit(partitionTimestamps) => fetchSpecificTimestampBasedOffsets(partitionTimestamps, failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets + case GlobalTimestampRangeLimit(timestamp) => + fetchGlobalTimestampBasedOffsets(timestamp, + failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets } } @@ -234,6 +237,39 @@ private[kafka010] class KafkaOffsetReaderConsumer( fnAssertFetchedOffsets) } + override def fetchGlobalTimestampBasedOffsets( + timestamp: Long, + failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = { + val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $timestamp") + } + + val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { tps => + val converted = tps.asScala.map(_ -> java.lang.Long.valueOf(timestamp)).toMap.asJava + + val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(converted) + + offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => + if (failsOnNoMatchingOffset) { + assert(offsetAndTimestamp != null, "No offset matched from request of " + + s"topic-partition $tp and timestamp $timestamp.") + } + + if (offsetAndTimestamp == null) { + tp -> KafkaOffsetRangeLimit.LATEST + } else { + tp -> offsetAndTimestamp.offset() + } + }.toMap + } + + val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => } + + fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + private def fetchSpecificOffsets0( fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit, fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long], diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 1e17f9a7407a..a3b8c6fc31f9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -104,6 +104,8 @@ private[kafka010] class KafkaSource( case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss) case SpecificTimestampRangeLimit(p) => kafkaReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true) + case GlobalTimestampRangeLimit(ts) => + kafkaReader.fetchGlobalTimestampBasedOffsets(ts, failsOnNoMatchingOffset = true) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 0c891cd725ae..3f58b3c0a158 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -90,8 +90,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParameters, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, + LatestOffsetRangeLimit) val kafkaOffsetReader = KafkaOffsetReader.build( strategy(caseInsensitiveParameters), @@ -128,13 +129,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParameters, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) + caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, + EarliestOffsetRangeLimit) assert(startingRelationOffsets != LatestOffsetRangeLimit) val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParameters, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveParameters, ENDING_TIMESTAMP_OPTION_KEY, + ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, + LatestOffsetRangeLimit) assert(endingRelationOffsets != EarliestOffsetRangeLimit) val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean @@ -334,8 +337,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private def validateBatchOptions(params: CaseInsensitiveMap[String]) = { // Batch specific options KafkaSourceProvider.getKafkaOffsetRangeLimit( - params, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, - EarliestOffsetRangeLimit) match { + params, STARTING_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { case EarliestOffsetRangeLimit => // good to go case LatestOffsetRangeLimit => throw new IllegalArgumentException("starting offset can't be latest " + @@ -348,11 +351,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister case _ => // ignore } case _: SpecificTimestampRangeLimit => // good to go + case _: GlobalTimestampRangeLimit => // good to go } KafkaSourceProvider.getKafkaOffsetRangeLimit( - params, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, - LatestOffsetRangeLimit) match { + params, ENDING_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match { case EarliestOffsetRangeLimit => throw new IllegalArgumentException("ending offset can't be earliest " + "for batch queries on Kafka") @@ -365,6 +369,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister case _ => // ignore } case _: SpecificTimestampRangeLimit => // good to go + case _: GlobalTimestampRangeLimit => // good to go } validateGeneralOptions(params) @@ -421,12 +426,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, + EarliestOffsetRangeLimit) val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, ENDING_TIMESTAMP_OPTION_KEY, + ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, + LatestOffsetRangeLimit) new KafkaBatch( strategy(caseInsensitiveOptions), @@ -449,8 +456,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, + LatestOffsetRangeLimit) val kafkaOffsetReader = KafkaOffsetReader.build( strategy(caseInsensitiveOptions), @@ -478,8 +486,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, + LatestOffsetRangeLimit) val kafkaOffsetReader = KafkaOffsetReader.build( strategy(caseInsensitiveOptions), @@ -521,6 +530,8 @@ private[kafka010] object KafkaSourceProvider extends Logging { private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp" private[kafka010] val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp" + private[kafka010] val STARTING_TIMESTAMP_OPTION_KEY = "startingtimestamp" + private[kafka010] val ENDING_TIMESTAMP_OPTION_KEY = "endingtimestamp" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions" private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger" @@ -564,20 +575,38 @@ private[kafka010] object KafkaSourceProvider extends Logging { def getKafkaOffsetRangeLimit( params: CaseInsensitiveMap[String], + globalOffsetTimestampOptionKey: String, offsetByTimestampOptionKey: String, offsetOptionKey: String, defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = { - params.get(offsetByTimestampOptionKey).map(_.trim) match { - case Some(json) => SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json)) - case None => - params.get(offsetOptionKey).map(_.trim) match { - case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => - LatestOffsetRangeLimit - case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => - EarliestOffsetRangeLimit - case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) - case None => defaultOffsets - } + // The order below represents "preferences" + + // 1. global timestamp + if (params.contains(globalOffsetTimestampOptionKey)) { + val tsStr = params(globalOffsetTimestampOptionKey).trim + try { + val ts = tsStr.toLong + return GlobalTimestampRangeLimit(ts) + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"Expected a single long value, got $tsStr") + } + } + + // 2. timestamp per topic partition + if (params.contains(offsetByTimestampOptionKey)) { + val json = params(offsetByTimestampOptionKey).trim + return SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json)) + } + + // 3. latest/earliest/offset + params.get(offsetOptionKey).map(_.trim) match { + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => + LatestOffsetRangeLimit + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => + EarliestOffsetRangeLimit + case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) + case None => defaultOffsets } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 4826e7a25144..ad8939d920a6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1470,6 +1470,16 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { "failOnDataLoss" -> failOnDataLoss.toString) } + test(s"assign from global timestamp per topic (failOnDataLoss: $failOnDataLoss)") { + val topic = newTopic() + testFromGlobalTimestamp( + topic, + failOnDataLoss = failOnDataLoss, + addPartitions = false, + "assign" -> assignString(topic, 0 to 4), + "failOnDataLoss" -> failOnDataLoss.toString) + } + test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") { val topic = newTopic() testFromLatestOffsets( @@ -1499,6 +1509,13 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { "subscribe" -> topic) } + test(s"subscribing topic by name from global timestamp per topic" + + s" (failOnDataLoss: $failOnDataLoss)") { + val topic = newTopic() + testFromGlobalTimestamp(topic, failOnDataLoss = failOnDataLoss, addPartitions = true, + "subscribe" -> topic) + } + test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" @@ -1538,6 +1555,17 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { addPartitions = true, "subscribePattern" -> s"$topicPrefix-.*") } + + test(s"subscribing topic by pattern from global timestamp per topic " + + s"(failOnDataLoss: $failOnDataLoss)") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromGlobalTimestamp( + topic, + failOnDataLoss = failOnDataLoss, + addPartitions = true, + "subscribePattern" -> s"$topicPrefix-.*") + } } test("bad source options") { @@ -1608,7 +1636,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""", SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) { val offset = getKafkaOffsetRangeLimit( - CaseInsensitiveMap[String](Map(optionKey -> optionValue)), "dummy", optionKey, + CaseInsensitiveMap[String](Map(optionKey -> optionValue)), "dummy", "dummy", optionKey, answer) assert(offset === answer) } @@ -1617,7 +1645,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit), (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) { val offset = getKafkaOffsetRangeLimit( - CaseInsensitiveMap[String](Map.empty), "dummy", optionKey, answer) + CaseInsensitiveMap[String](Map.empty), "dummy", "dummy", optionKey, answer) assert(offset === answer) } } @@ -1687,27 +1715,11 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { failOnDataLoss: Boolean, addPartitions: Boolean, options: (String, String)*): Unit = { - def sendMessages(topic: String, msgs: Seq[String], part: Int, ts: Long): Unit = { - val records = msgs.map { msg => - new RecordBuilder(topic, msg).partition(part).timestamp(ts).build() - } - testUtils.sendMessages(records) - } - testUtils.createTopic(topic, partitions = 5) val firstTimestamp = System.currentTimeMillis() - 5000 - sendMessages(topic, Array(-20).map(_.toString), 0, firstTimestamp) - sendMessages(topic, Array(-10).map(_.toString), 1, firstTimestamp) - sendMessages(topic, Array(0, 1).map(_.toString), 2, firstTimestamp) - sendMessages(topic, Array(10, 11).map(_.toString), 3, firstTimestamp) - sendMessages(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp) - val secondTimestamp = firstTimestamp + 1000 - sendMessages(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp) - sendMessages(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp) - sendMessages(topic, Array(2).map(_.toString), 2, secondTimestamp) - sendMessages(topic, Array(12).map(_.toString), 3, secondTimestamp) + setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp) // no data after second timestamp for partition 4 require(testUtils.getLatestOffsets(Set(topic)).size === 5) @@ -1719,18 +1731,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { ) ++ Map(new TopicPartition(topic, 4) -> firstTimestamp) val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps) - val reader = spark - .readStream - .format("kafka") - .option("startingOffsetsByTimestamp", startingTimestamps) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("failOnDataLoss", failOnDataLoss.toString) - options.foreach { case (k, v) => reader.option(k, v) } - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss, + options: _*) testStream(mapped)( makeSureGetOffsetCalled, @@ -1758,6 +1760,123 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { ) } + private def testFromGlobalTimestamp( + topic: String, + failOnDataLoss: Boolean, + addPartitions: Boolean, + options: (String, String)*): Unit = { + testUtils.createTopic(topic, partitions = 5) + + val firstTimestamp = System.currentTimeMillis() - 5000 + val secondTimestamp = firstTimestamp + 1000 + setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp) + // here we should add records in partition 4 which match with second timestamp + // as the query will break if there's no matching records + sendMessagesWithTimestamp(topic, Array(23, 24).map(_.toString), 4, secondTimestamp) + + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + // we intentionally starts from second timestamp for all partitions + // via setting global partition + val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, failOnDataLoss, + options: _*) + testStream(mapped)( + makeSureGetOffsetCalled, + Execute { q => + // wait to reach the last offset in every partition + val partAndOffsets = (0 to 4).map(new TopicPartition(topic, _)).map { tp => + if (tp.partition() < 4) { + tp -> 3L + } else { + tp -> 5L // we added 2 more records to partition 4 + } + }.toMap + q.awaitOffset(0, KafkaSourceOffset(partAndOffsets), streamingTimeout.toMillis) + }, + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24), + StopStream, + StartStream(), + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24), // Should get the data back on recovery + StopStream, + AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped + StartStream(), + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32), // Should get the added data + AssertOnQuery("Add partitions") { query: StreamExecution => + if (addPartitions) setTopicPartitions(topic, 10, query) + true + }, + AddKafkaData(Set(topic), 40, 41, 42, 43, 44)(ensureDataInMultiplePartition = true), + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32, 40, 41, 42, 43, 44), + StopStream + ) + } + + private def sendMessagesWithTimestamp( + topic: String, + msgs: Seq[String], + part: Int, + ts: Long): Unit = { + val records = msgs.map { msg => + new RecordBuilder(topic, msg).partition(part).timestamp(ts).build() + } + testUtils.sendMessages(records) + } + + private def setupTestMessagesForTestOnTimestampOffsets( + topic: String, + firstTimestamp: Long, + secondTimestamp: Long): Unit = { + sendMessagesWithTimestamp(topic, Array(-20).map(_.toString), 0, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(-10).map(_.toString), 1, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(0, 1).map(_.toString), 2, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(10, 11).map(_.toString), 3, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp) + + sendMessagesWithTimestamp(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp) + sendMessagesWithTimestamp(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp) + sendMessagesWithTimestamp(topic, Array(2).map(_.toString), 2, secondTimestamp) + sendMessagesWithTimestamp(topic, Array(12).map(_.toString), 3, secondTimestamp) + // no data after second timestamp for partition 4 + } + + private def setupDataFrameForTestOnTimestampOffsets( + startingTimestamps: String, + failOnDataLoss: Boolean, + options: (String, String)*): Dataset[_] = { + val reader = spark + .readStream + .format("kafka") + .option("startingOffsetsByTimestamp", startingTimestamps) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("failOnDataLoss", failOnDataLoss.toString) + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + mapped + } + + private def setupDataFrameForTestOnGlobalTimestamp( + startingTimestamp: Long, + failOnDataLoss: Boolean, + options: (String, String)*): Dataset[_] = { + val reader = spark + .readStream + .format("kafka") + .option("startingTimestamp", startingTimestamp) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("failOnDataLoss", failOnDataLoss.toString) + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + mapped + } + test("Kafka column types") { val now = System.currentTimeMillis() val topic = newTopic() diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 9cec37e708db..59729d5487ab 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -263,6 +263,15 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession }, topic, 0 to 19) } + test("global timestamp provided for starting and ending") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // timestamp both presented: starting "first" ending "finalized" + verifyTimestampRelatedQueryResult({ df => + df.option("startingTimestamp", timestamps(1)).option("endingTimestamp", timestamps(2)) + }, topic, 10 to 19) + } + test("no matched offset for timestamp - startingOffsets") { val (topic, timestamps) = prepareTimestampRelatedUnitTest @@ -284,6 +293,31 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession TestUtils.assertExceptionMsg(e, "No offset matched from request") } + test("specifying both global timestamp and specific timestamp for partition") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + /* + The test will set both configs differently: + + * global timestamp + starting only presented as "third", and ending not presented + + * specific timestamp for partition + starting only presented as "second", and ending not presented + + Here we expect global timestamp will take effect. + */ + verifyTimestampRelatedQueryResult({ df => + val startTopicTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*) + val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + + df + .option("startingTimestamp", timestamps(2)) + .option("startingOffsetsByTimestamp", startingTimestamps) + }, topic, 20 to 29) + } + test("no matched offset for timestamp - endingOffsets") { val (topic, timestamps) = prepareTimestampRelatedUnitTest From ec1f662234dc3e986ae460077e16a6c557f03cf7 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 21 May 2021 12:06:45 +0900 Subject: [PATCH 2/5] Refine doc --- .../structured-streaming-kafka-integration.md | 54 ++++++++----------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 492e3b256ea2..26ec5a063293 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -368,16 +368,11 @@ The following configurations are optional: none (next preference is startingOffsetsByTimestamp) streaming and batch The start point of timestamp when a query is started, a string specifying a starting timestamp for - all partitions in topics being subscribed. The returned offset for each partition is the earliest offset whose timestamp is greater than or - equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, + all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)

- Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

- For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

- Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

- Note: This option requires Kafka 0.10.1.0 or higher.

- Note2: startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

- Note3: For streaming queries, this only applies when a new query is started, and that resuming will + Note1: startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

+ Note2: For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

none (the value of startingOffsets will apply) streaming and batch The start point of timestamp when a query is started, a json string specifying a starting timestamp for - each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or - equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, + each TopicPartition. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)

- Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

- For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

- Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

- Note: This option requires Kafka 0.10.1.0 or higher.

- Note2: startingOffsetsByTimestamp takes precedence over startingOffsets.

- Note3: For streaming queries, this only applies when a new query is started, and that resuming will + Note1: startingOffsetsByTimestamp takes precedence over startingOffsets.

+ Note2: For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

none (next preference is endingOffsetsByTimestamp) batch query The end point when a batch query is ended, a json string specifying an ending timestamp for - all partitions in topics being subscribed. The returned offset for each partition is the earliest offset whose timestamp is greater than or - equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will be set to latest.

-

- Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

- For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

- Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

- Note: This option requires Kafka 0.10.1.0 or higher.

- Note2: endingTimestamp takes precedence over endingOffsetsByTimestamp and endingOffsets.

+ all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the offset will + be set to latest.

+ Note: endingTimestamp takes precedence over endingOffsetsByTimestamp and endingOffsets.

none (next preference is endingOffsets) batch query The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. - The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to - the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will + Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the offset will be set to latest.

-

- Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

- For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

- Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

- Note: This option requires Kafka 0.10.1.0 or higher.

- Note2: endingOffsetsByTimestamp takes precedence over endingOffsets. + Note: endingOffsetsByTimestamp takes precedence over endingOffsets.

+### Details on timestamp offset options + +The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. +The behavior varies across options if the matched offset doesn't exist - check the description of each option. + +Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value. +For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details. +Also, the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details. + +Timestamp offset options require Kafka 0.10.1.0 or higher. + ### Offset fetching In Spark 3.0 and before Spark uses KafkaConsumer for offset fetching which could cause infinite wait in the driver. From afd6c1175bd6ba9fcfd2d05b6697dc3761f1ae5c Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 25 May 2021 09:28:05 +0900 Subject: [PATCH 3/5] Reflect review comments --- .../structured-streaming-kafka-integration.md | 16 +++++----- .../sql/kafka010/KafkaSourceProvider.scala | 32 +++++++++---------- .../sql/kafka010/KafkaRelationSuite.scala | 25 +++++++++++---- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 26ec5a063293..76f8c50da575 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -368,7 +368,7 @@ The following configurations are optional: none (next preference is startingOffsetsByTimestamp) streaming and batch The start point of timestamp when a query is started, a string specifying a starting timestamp for - all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, + all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)

Note1: startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

@@ -381,10 +381,10 @@ The following configurations are optional: json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ - none (the value of startingOffsets will apply) + none (next preference is startingOffsets) streaming and batch The start point of timestamp when a query is started, a json string specifying a starting timestamp for - each TopicPartition. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, + each TopicPartition. Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)

Note1: startingOffsetsByTimestamp takes precedence over startingOffsets.

@@ -413,8 +413,8 @@ The following configurations are optional: none (next preference is endingOffsetsByTimestamp) batch query The end point when a batch query is ended, a json string specifying an ending timestamp for - all partitions in topics being subscribed. Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the offset will - be set to latest.

+ all partitions in topics being subscribed. Please refer the details on timestamp offset options below. + If Kafka doesn't return the matched offset, the offset will be set to latest.

Note: endingTimestamp takes precedence over endingOffsetsByTimestamp and endingOffsets.

@@ -426,8 +426,8 @@ The following configurations are optional: none (next preference is endingOffsets) batch query The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. - Please refer the details on timestamp offset options below. If the matched offset doesn't exist, the offset will - be set to latest.

+ Please refer the details on timestamp offset options below. If Kafka doesn't return the matched offset, + the offset will be set to latest.

Note: endingOffsetsByTimestamp takes precedence over endingOffsets. @@ -529,7 +529,7 @@ The following configurations are optional: ### Details on timestamp offset options The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. -The behavior varies across options if the matched offset doesn't exist - check the description of each option. +The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option. Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value. For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 3f58b3c0a158..b6092e161d1e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -581,32 +581,30 @@ private[kafka010] object KafkaSourceProvider extends Logging { defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = { // The order below represents "preferences" - // 1. global timestamp if (params.contains(globalOffsetTimestampOptionKey)) { + // 1. global timestamp val tsStr = params(globalOffsetTimestampOptionKey).trim try { val ts = tsStr.toLong - return GlobalTimestampRangeLimit(ts) + GlobalTimestampRangeLimit(ts) } catch { case _: NumberFormatException => throw new IllegalArgumentException(s"Expected a single long value, got $tsStr") } - } - - // 2. timestamp per topic partition - if (params.contains(offsetByTimestampOptionKey)) { + } else if (params.contains(offsetByTimestampOptionKey)) { + // 2. timestamp per topic partition val json = params(offsetByTimestampOptionKey).trim - return SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json)) - } - - // 3. latest/earliest/offset - params.get(offsetOptionKey).map(_.trim) match { - case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => - LatestOffsetRangeLimit - case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => - EarliestOffsetRangeLimit - case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) - case None => defaultOffsets + SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json)) + } else { + // 3. latest/earliest/offset + params.get(offsetOptionKey).map(_.trim) match { + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => + LatestOffsetRangeLimit + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => + EarliestOffsetRangeLimit + case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) + case None => defaultOffsets + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 59729d5487ab..6e1ece3f436a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -293,7 +293,7 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession TestUtils.assertExceptionMsg(e, "No offset matched from request") } - test("specifying both global timestamp and specific timestamp for partition") { + test("preferences on offset related options") { val (topic, timestamps) = prepareTimestampRelatedUnitTest /* @@ -305,17 +305,30 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession * specific timestamp for partition starting only presented as "second", and ending not presented - Here we expect global timestamp will take effect. + * offsets + starting only presented as "earliest", and ending not presented + + The preference goes to global timestamp -> timestamp for partition -> offsets */ - verifyTimestampRelatedQueryResult({ df => - val startTopicTimestamps = Map( - (0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*) - val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + val startTopicTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*) + val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + + // all options are specified: global timestamp + verifyTimestampRelatedQueryResult({ df => df .option("startingTimestamp", timestamps(2)) .option("startingOffsetsByTimestamp", startingTimestamps) + .option("startingOffsets", "earliest") }, topic, 20 to 29) + + // timestamp for partition and offsets are specified: timestamp for partition + verifyTimestampRelatedQueryResult({ df => + df + .option("startingOffsetsByTimestamp", startingTimestamps) + .option("startingOffsets", "earliest") + }, topic, 10 to 29) } test("no matched offset for timestamp - endingOffsets") { From 20b276d2a6478576fdce73b24ec4d67371b190df Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 25 May 2021 16:53:05 +0900 Subject: [PATCH 4/5] Change versioning --- docs/structured-streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 76f8c50da575..d1334f07f24b 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -532,7 +532,7 @@ The returned offset for each partition is the earliest offset whose timestamp is The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option. Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value. -For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details. +For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details. Also, the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details. Timestamp offset options require Kafka 0.10.1.0 or higher. From b72f590de83217aa30753df1a28f2b1ac9459983 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 25 May 2021 20:12:22 +0900 Subject: [PATCH 5/5] Fix indentation --- .../org/apache/spark/sql/kafka010/KafkaOffsetReader.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 9c957d3bff7f..764c6434627b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -87,8 +87,9 @@ private[kafka010] trait KafkaOffsetReader { * @param timestamp the timestamp. * @param failsOnNoMatchingOffset whether to fail the query when no matched offset can be found. */ - def fetchGlobalTimestampBasedOffsets(timestamp: Long, - failsOnNoMatchingOffset: Boolean): KafkaSourceOffset + def fetchGlobalTimestampBasedOffsets( + timestamp: Long, + failsOnNoMatchingOffset: Boolean): KafkaSourceOffset /** * Fetch the earliest offsets for the topic partitions that are indicated