diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index ef4cdc2608f4..89732d309aa2 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -362,6 +362,27 @@ The following configurations are optional:
| Option | value | default | query type | meaning |
|---|---|---|---|---|
| startingOffsetsByTimestamp | +json string + """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ + | +none (the value of startingOffsets |
+ streaming and batch | +The start point of timestamp when a query is started, a json string specifying a starting timestamp for
+ each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or
+ equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist,
+ the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)
+
+ Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.
+ For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.
+ Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.
+ Note: This option requires Kafka 0.10.1.0 or higher.
+ Note2: startingOffsetsByTimestamp takes precedence over startingOffsets.
+ Note3: For streaming queries, this only applies when a new query is started, and that resuming will
+ always pick up from where the query left off. Newly discovered partitions during a query will start at
+ earliest. |
+
| startingOffsets | "earliest", "latest" (streaming only), or json string @@ -377,6 +398,25 @@ The following configurations are optional: always pick up from where the query left off. Newly discovered partitions during a query will start at earliest. | |||
| endingOffsetsByTimestamp | +json string + """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ + | +latest | +batch query | +The end point when a batch query is ended, a json string specifying an ending timesamp for each TopicPartition.
+ The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to
+ the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will
+ be set to latest.
+
+ Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.
+ For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.
+ Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.
+ Note: This option requires Kafka 0.10.1.0 or higher.
+ Note2: endingOffsetsByTimestamp takes precedence over endingOffsets.
+ |
+
| endingOffsets | latest or json string diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 868edb5dcdc0..6dd5af2389a8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -68,7 +68,7 @@ private object JsonUtils { partOffsets.map { case (part, offset) => new TopicPartition(topic, part) -> offset } - }.toMap + } } catch { case NonFatal(x) => throw new IllegalArgumentException( @@ -76,12 +76,27 @@ private object JsonUtils { } } + def partitionTimestamps(str: String): Map[TopicPartition, Long] = { + try { + Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partTimestamps) => + partTimestamps.map { case (part, timestamp) => + new TopicPartition(topic, part) -> timestamp + } + } + } catch { + case NonFatal(x) => + throw new IllegalArgumentException( + s"""Expected e.g. {"topicA": {"0": 123456789, "1": 123456789}, + |"topicB": {"0": 123456789, "1": 123456789}}, got $str""".stripMargin) + } + } + /** * Write per-TopicPartition offsets as json string */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { val result = new HashMap[String, HashMap[Int, Long]]() - implicit val ordering = new Ordering[TopicPartition] { + implicit val order = new Ordering[TopicPartition] { override def compare(x: TopicPartition, y: TopicPartition): Int = { Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) } @@ -95,4 +110,9 @@ private object JsonUtils { } Serialization.write(result) } + + def partitionTimestamps(topicTimestamps: Map[TopicPartition, Long]): String = { + // For now it's same as partitionOffsets + partitionOffsets(topicTimestamps) + } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala index 667c38368191..3006770f306c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala @@ -59,8 +59,8 @@ private[kafka010] class KafkaBatch( // Leverage the KafkaReader to obtain the relevant partition offsets val (fromPartitionOffsets, untilPartitionOffsets) = { try { - (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets), - kafkaOffsetReader.fetchPartitionOffsets(endingOffsets)) + (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, isStartingOffsets = true), + kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, isStartingOffsets = false)) } finally { kafkaOffsetReader.close() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 9e7b7d6db203..0603ae39ba62 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -69,6 +69,8 @@ class KafkaContinuousStream( case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) + case SpecificTimestampRangeLimit(p) => offsetReader.fetchSpecificTimestampBasedOffsets(p, + failsOnNoMatchingOffset = true) } logInfo(s"Initial offsets: $offsets") offsets diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 6ea6efe5d1b7..01f6ba444516 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -192,6 +192,8 @@ private[kafka010] class KafkaMicroBatchStream( KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) + case SpecificTimestampRangeLimit(p) => + kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala index 80a026f4f5d7..d64b5d4f7e9e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala @@ -42,6 +42,13 @@ private[kafka010] case object LatestOffsetRangeLimit extends KafkaOffsetRangeLim private[kafka010] case class SpecificOffsetRangeLimit( partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit +/** + * Represents the desire to bind to earliest offset which timestamp for the offset is equal or + * greater than specific timestamp. + */ +private[kafka010] case class SpecificTimestampRangeLimit( + topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit + private[kafka010] object KafkaOffsetRangeLimit { /** * Used to denote offset range limits that are resolved via Kafka diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 20f2ce11d4af..0179f4dd822f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -26,7 +26,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.util.control.NonFatal -import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, OffsetAndTimestamp} import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging @@ -126,12 +126,14 @@ private[kafka010] class KafkaOffsetReader( * Fetch the partition offsets for the topic partitions that are indicated * in the [[ConsumerStrategy]] and [[KafkaOffsetRangeLimit]]. */ - def fetchPartitionOffsets(offsetRangeLimit: KafkaOffsetRangeLimit): Map[TopicPartition, Long] = { + def fetchPartitionOffsets( + offsetRangeLimit: KafkaOffsetRangeLimit, + isStartingOffsets: Boolean): Map[TopicPartition, Long] = { def validateTopicPartitions(partitions: Set[TopicPartition], partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { assert(partitions == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest, if you don't care.\n" + + "Use -1 for latest, -2 for earliest.\n" + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}") logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") partitionOffsets @@ -147,6 +149,9 @@ private[kafka010] class KafkaOffsetReader( }.toMap case SpecificOffsetRangeLimit(partitionOffsets) => validateTopicPartitions(partitions, partitionOffsets) + case SpecificTimestampRangeLimit(partitionTimestamps) => + fetchSpecificTimestampBasedOffsets(partitionTimestamps, + failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets } } @@ -161,23 +166,83 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { - val fetched = runUninterruptibly { - withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() + val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionOffsets.keySet, + "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + } - // Call `position` to wait until the potential offset request triggered by `poll(0)` is - // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by - // `poll(0)` may reset offsets that should have been set by another request. - partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) + val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => + partitionOffsets + } + + val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => + partitionOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { + reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } + case _ => + // no real way to check that beginning or end is reasonable + } + } + + fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + def fetchSpecificTimestampBasedOffsets( + partitionTimestamps: Map[TopicPartition, Long], + failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = { + val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => + assert(partitions.asScala == partitionTimestamps.keySet, + "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + + s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps") + } - consumer.pause(partitions) - assert(partitions.asScala == partitionOffsets.keySet, - "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest, if you don't care.\n" + - s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => { + val converted = partitionTimestamps.map { case (tp, timestamp) => + tp -> java.lang.Long.valueOf(timestamp) + }.asJava + + val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(converted) + + offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => + if (failsOnNoMatchingOffset) { + assert(offsetAndTimestamp != null, "No offset matched from request of " + + s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.") + } + + if (offsetAndTimestamp == null) { + tp -> KafkaOffsetRangeLimit.LATEST + } else { + tp -> offsetAndTimestamp.offset() + } + }.toMap + } + } + + val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => } + + fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, + fnAssertFetchedOffsets) + } + + private def fetchSpecificOffsets0( + fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit, + fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long], + fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit): KafkaSourceOffset = { + val fetched = partitionsAssignedToConsumer { + partitions => { + fnAssertParametersWithPartitions(partitions) + + val partitionOffsets = fnRetrievePartitionOffsets(partitions) partitionOffsets.foreach { case (tp, KafkaOffsetRangeLimit.LATEST) => @@ -186,22 +251,15 @@ private[kafka010] class KafkaOffsetReader( consumer.seekToBeginning(ju.Arrays.asList(tp)) case (tp, off) => consumer.seek(tp, off) } + partitionOffsets.map { case (tp, _) => tp -> consumer.position(tp) } } } - partitionOffsets.foreach { - case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && - off != KafkaOffsetRangeLimit.EARLIEST => - if (fetched(tp) != off) { - reportDataLoss( - s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") - } - case _ => - // no real way to check that beginning or end is reasonable - } + fnAssertFetchedOffsets(fetched) + KafkaSourceOffset(fetched) } @@ -209,20 +267,15 @@ private[kafka010] class KafkaOffsetReader( * Fetch the earliest offsets for the topic partitions that are indicated * in the [[ConsumerStrategy]]. */ - def fetchEarliestOffsets(): Map[TopicPartition, Long] = runUninterruptibly { - withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning") + def fetchEarliestOffsets(): Map[TopicPartition, Long] = partitionsAssignedToConsumer( + partitions => { + logDebug("Seeking to the beginning") consumer.seekToBeginning(partitions) val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap logDebug(s"Got earliest offsets for partition : $partitionOffsets") partitionOffsets - } - } + }, fetchingEarliestOffset = true) /** * Fetch the latest offsets for the topic partitions that are indicated @@ -239,19 +292,9 @@ private[kafka010] class KafkaOffsetReader( * distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying. */ def fetchLatestOffsets( - knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly { - withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - - // Call `position` to wait until the potential offset request triggered by `poll(0)` is - // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by - // `poll(0)` may reset offsets that should have been set by another request. - partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) - - consumer.pause(partitions) - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.") + knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = + partitionsAssignedToConsumer { partitions => { + logDebug("Seeking to the end.") if (knownOffsets.isEmpty) { consumer.seekToEnd(partitions) @@ -315,25 +358,40 @@ private[kafka010] class KafkaOffsetReader( if (newPartitions.isEmpty) { Map.empty[TopicPartition, Long] } else { - runUninterruptibly { - withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"\tPartitions assigned to consumer: $partitions") - - // Get the earliest offset of each partition - consumer.seekToBeginning(partitions) - val partitionOffsets = newPartitions.filter { p => - // When deleting topics happen at the same time, some partitions may not be in - // `partitions`. So we need to ignore them - partitions.contains(p) - }.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") - partitionOffsets - } + partitionsAssignedToConsumer(partitions => { + // Get the earliest offset of each partition + consumer.seekToBeginning(partitions) + val partitionOffsets = newPartitions.filter { p => + // When deleting topics happen at the same time, some partitions may not be in + // `partitions`. So we need to ignore them + partitions.contains(p) + }.map(p => p -> consumer.position(p)).toMap + logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") + partitionOffsets + }, fetchingEarliestOffset = true) + } + } + + private def partitionsAssignedToConsumer( + body: ju.Set[TopicPartition] => Map[TopicPartition, Long], + fetchingEarliestOffset: Boolean = false) + : Map[TopicPartition, Long] = runUninterruptibly { + + withRetriesWithoutInterrupt { + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + + if (!fetchingEarliestOffset) { + // Call `position` to wait until the potential offset request triggered by `poll(0)` is + // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by + // `poll(0)` may reset offsets that should have been set by another request. + partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) } + + consumer.pause(partitions) + logDebug(s"Partitions assigned to consumer: $partitions.") + body(partitions) } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index 886f6b0fe0a5..61479c992039 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -68,8 +68,8 @@ private[kafka010] class KafkaRelation( // Leverage the KafkaReader to obtain the relevant partition offsets val (fromPartitionOffsets, untilPartitionOffsets) = { try { - (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets), - kafkaOffsetReader.fetchPartitionOffsets(endingOffsets)) + (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, isStartingOffsets = true), + kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, isStartingOffsets = false)) } finally { kafkaOffsetReader.close() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 29944dc3fbf1..e1392b6215d3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -105,6 +105,8 @@ private[kafka010] class KafkaSource( case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss) + case SpecificTimestampRangeLimit(p) => + kafkaReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index a7f8db35d7cf..c15f08d78741 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -89,7 +89,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParameters, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveParameters, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) val kafkaOffsetReader = new KafkaOffsetReader( strategy(caseInsensitiveParameters), @@ -126,11 +127,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParameters, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) + caseInsensitiveParameters, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) assert(startingRelationOffsets != LatestOffsetRangeLimit) val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParameters, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveParameters, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) assert(endingRelationOffsets != EarliestOffsetRangeLimit) val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean @@ -321,13 +324,17 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // Stream specific options params.get(ENDING_OFFSETS_OPTION_KEY).map(_ => throw new IllegalArgumentException("ending offset not valid in streaming queries")) + params.get(ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY).map(_ => + throw new IllegalArgumentException("ending timestamp not valid in streaming queries")) + validateGeneralOptions(params) } private def validateBatchOptions(params: CaseInsensitiveMap[String]) = { // Batch specific options KafkaSourceProvider.getKafkaOffsetRangeLimit( - params, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match { + params, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, + EarliestOffsetRangeLimit) match { case EarliestOffsetRangeLimit => // good to go case LatestOffsetRangeLimit => throw new IllegalArgumentException("starting offset can't be latest " + @@ -339,10 +346,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister "be latest for batch queries on Kafka") case _ => // ignore } + case _: SpecificTimestampRangeLimit => // good to go } KafkaSourceProvider.getKafkaOffsetRangeLimit( - params, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match { + params, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, + LatestOffsetRangeLimit) match { case EarliestOffsetRangeLimit => throw new IllegalArgumentException("ending offset can't be earliest " + "for batch queries on Kafka") @@ -354,6 +363,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister "earliest for batch queries on Kafka") case _ => // ignore } + case _: SpecificTimestampRangeLimit => // good to go } validateGeneralOptions(params) @@ -420,10 +430,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) new KafkaBatch( strategy(caseInsensitiveOptions), @@ -446,7 +458,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) val kafkaOffsetReader = new KafkaOffsetReader( strategy(caseInsensitiveOptions), @@ -474,7 +487,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, + STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) val kafkaOffsetReader = new KafkaOffsetReader( strategy(caseInsensitiveOptions), @@ -500,6 +514,8 @@ private[kafka010] object KafkaSourceProvider extends Logging { private val STRATEGY_OPTION_KEYS = Set(SUBSCRIBE, SUBSCRIBE_PATTERN, ASSIGN) private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" + private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp" + private[kafka010] val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions" private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger" @@ -543,15 +559,20 @@ private[kafka010] object KafkaSourceProvider extends Logging { def getKafkaOffsetRangeLimit( params: CaseInsensitiveMap[String], + offsetByTimestampOptionKey: String, offsetOptionKey: String, defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = { - params.get(offsetOptionKey).map(_.trim) match { - case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => - LatestOffsetRangeLimit - case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => - EarliestOffsetRangeLimit - case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) - case None => defaultOffsets + params.get(offsetByTimestampOptionKey).map(_.trim) match { + case Some(json) => SpecificTimestampRangeLimit(JsonUtils.partitionTimestamps(json)) + case None => + params.get(offsetOptionKey).map(_.trim) match { + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => + LatestOffsetRangeLimit + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => + EarliestOffsetRangeLimit + case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) + case None => defaultOffsets + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala index d97f627fbac0..a790ed064c3d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala @@ -323,7 +323,10 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester private def prepareTestTopicHavingTestMessages(topic: String) = { val data = (1 to 1000).map(i => (i.toString, Seq[(String, Array[Byte])]())) testUtils.createTopic(topic, 1) - testUtils.sendMessages(topic, data.toArray, None) + val messages = data.map { case (value, hdrs) => + new RecordBuilder(topic, value).headers(hdrs).build() + } + testUtils.sendMessages(messages) data } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 609cf3ce4bd7..26136203b09a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1178,9 +1178,10 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4)) require(testUtils.getLatestOffsets(Set(topic)).size === 5) + val headers = Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8))) (31 to 35).map { num => - (num - 31, (num.toString, Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8))))) - }.foreach { rec => testUtils.sendMessage(topic, rec._2, Some(rec._1)) } + new RecordBuilder(topic, num.toString).partition(num - 31).headers(headers).build() + }.foreach { rec => testUtils.sendMessage(rec) } val kafka = spark .readStream @@ -1277,6 +1278,16 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { "failOnDataLoss" -> failOnDataLoss.toString) } + test(s"assign from specific timestamps (failOnDataLoss: $failOnDataLoss)") { + val topic = newTopic() + testFromSpecificTimestamps( + topic, + failOnDataLoss = failOnDataLoss, + addPartitions = false, + "assign" -> assignString(topic, 0 to 4), + "failOnDataLoss" -> failOnDataLoss.toString) + } + test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") { val topic = newTopic() testFromLatestOffsets( @@ -1300,6 +1311,12 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic) } + test(s"subscribing topic by name from specific timestamps (failOnDataLoss: $failOnDataLoss)") { + val topic = newTopic() + testFromSpecificTimestamps(topic, failOnDataLoss = failOnDataLoss, addPartitions = true, + "subscribe" -> topic) + } + test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" @@ -1328,6 +1345,17 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { failOnDataLoss = failOnDataLoss, "subscribePattern" -> s"$topicPrefix-.*") } + + test(s"subscribing topic by pattern from specific timestamps " + + s"(failOnDataLoss: $failOnDataLoss)") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromSpecificTimestamps( + topic, + failOnDataLoss = failOnDataLoss, + addPartitions = true, + "subscribePattern" -> s"$topicPrefix-.*") + } } test("bad source options") { @@ -1347,6 +1375,9 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { // Specifying an ending offset testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in streaming queries") + testBadOptions("subscribe" -> "t", "endingOffsetsByTimestamp" -> "{\"t\": {\"0\": 1000}}")( + "Ending timestamp not valid in streaming queries") + // No strategy specified testBadOptions()("options must be specified", "subscribe", "subscribePattern") @@ -1395,7 +1426,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""", SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) { val offset = getKafkaOffsetRangeLimit( - CaseInsensitiveMap[String](Map(optionKey -> optionValue)), optionKey, answer) + CaseInsensitiveMap[String](Map(optionKey -> optionValue)), "dummy", optionKey, + answer) assert(offset === answer) } @@ -1403,7 +1435,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit), (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) { val offset = getKafkaOffsetRangeLimit( - CaseInsensitiveMap[String](Map.empty), optionKey, answer) + CaseInsensitiveMap[String](Map.empty), "dummy", optionKey, answer) assert(offset === answer) } } @@ -1468,12 +1500,89 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { ) } + private def testFromSpecificTimestamps( + topic: String, + failOnDataLoss: Boolean, + addPartitions: Boolean, + options: (String, String)*): Unit = { + def sendMessages(topic: String, msgs: Seq[String], part: Int, ts: Long): Unit = { + val records = msgs.map { msg => + new RecordBuilder(topic, msg).partition(part).timestamp(ts).build() + } + testUtils.sendMessages(records) + } + + testUtils.createTopic(topic, partitions = 5) + + val firstTimestamp = System.currentTimeMillis() - 5000 + sendMessages(topic, Array(-20).map(_.toString), 0, firstTimestamp) + sendMessages(topic, Array(-10).map(_.toString), 1, firstTimestamp) + sendMessages(topic, Array(0, 1).map(_.toString), 2, firstTimestamp) + sendMessages(topic, Array(10, 11).map(_.toString), 3, firstTimestamp) + sendMessages(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp) + + val secondTimestamp = firstTimestamp + 1000 + sendMessages(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp) + sendMessages(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp) + sendMessages(topic, Array(2).map(_.toString), 2, secondTimestamp) + sendMessages(topic, Array(12).map(_.toString), 3, secondTimestamp) + // no data after second timestamp for partition 4 + + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + // we intentionally starts from second timestamp, + // except for partition 4 - it starts from first timestamp + val startPartitionTimestamps: Map[TopicPartition, Long] = Map( + (0 to 3).map(new TopicPartition(topic, _) -> secondTimestamp): _* + ) ++ Map(new TopicPartition(topic, 4) -> firstTimestamp) + val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps) + + val reader = spark + .readStream + .format("kafka") + .option("startingOffsetsByTimestamp", startingTimestamps) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("failOnDataLoss", failOnDataLoss.toString) + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + Execute { q => + val partitions = (0 to 4).map(new TopicPartition(topic, _)) + // wait to reach the last offset in every partition + q.awaitOffset( + 0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), streamingTimeout.toMillis) + }, + CheckAnswer(-21, -22, -11, -12, 2, 12, 20, 21, 22), + StopStream, + StartStream(), + CheckAnswer(-21, -22, -11, -12, 2, 12, 20, 21, 22), // Should get the data back on recovery + StopStream, + AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped + StartStream(), + CheckAnswer(-21, -22, -11, -12, 2, 12, 20, 21, 22, 30, 31, 32), // Should get the added data + AssertOnQuery("Add partitions") { query: StreamExecution => + if (addPartitions) setTopicPartitions(topic, 10, query) + true + }, + AddKafkaData(Set(topic), 40, 41, 42, 43, 44)(ensureDataInMultiplePartition = true), + CheckAnswer(-21, -22, -11, -12, 2, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42, 43, 44), + StopStream + ) + } + test("Kafka column types") { val now = System.currentTimeMillis() val topic = newTopic() testUtils.createTopic(newTopic(), partitions = 1) testUtils.sendMessage( - topic, ("1", Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))), None + new RecordBuilder(topic, "1") + .headers(Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))).build() ) val kafka = spark diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 3c88609bcb45..556eab4b5638 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.util.Random @@ -28,11 +29,11 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf -import org.apache.spark.sql.QueryTest +import org.apache.spark.SparkException +import org.apache.spark.sql.{DataFrameReader, QueryTest} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils @@ -159,13 +160,15 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession val topic = newTopic() testUtils.createTopic(topic, partitions = 3) testUtils.sendMessage( - topic, ("1", Seq()), Some(0) + new RecordBuilder(topic, "1").headers(Seq()).partition(0).build() ) testUtils.sendMessage( - topic, ("2", Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))), Some(1) + new RecordBuilder(topic, "2").headers( + Seq(("a", "b".getBytes(UTF_8)), ("c", "d".getBytes(UTF_8)))).partition(1).build() ) testUtils.sendMessage( - topic, ("3", Seq(("e", "f".getBytes(UTF_8)), ("e", "g".getBytes(UTF_8)))), Some(2) + new RecordBuilder(topic, "3").headers( + Seq(("e", "f".getBytes(UTF_8)), ("e", "g".getBytes(UTF_8)))).partition(2).build() ) // Implicit offset values, should default to earliest and latest @@ -176,6 +179,191 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession ("3", Seq(("e", "f".getBytes(UTF_8)), ("e", "g".getBytes(UTF_8))))).toDF) } + test("timestamp provided for starting and ending") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // timestamp both presented: starting "first" ending "finalized" + verifyTimestampRelatedQueryResult({ df => + val startPartitionTimestamps: Map[TopicPartition, Long] = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*) + val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps) + + val endPartitionTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(2)): _*) + val endingTimestamps = JsonUtils.partitionTimestamps(endPartitionTimestamps) + + df.option("startingOffsetsByTimestamp", startingTimestamps) + .option("endingOffsetsByTimestamp", endingTimestamps) + }, topic, 10 to 19) + } + + test("timestamp provided for starting, offset provided for ending") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // starting only presented as "first", and ending presented as endingOffsets + verifyTimestampRelatedQueryResult({ df => + val startTopicTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps.head): _*) + val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + + val endPartitionOffsets = Map( + new TopicPartition(topic, 0) -> -1L, // -1 => latest + new TopicPartition(topic, 1) -> -1L, + new TopicPartition(topic, 2) -> 1L // explicit offset - take only first one + ) + val endingOffsets = JsonUtils.partitionOffsets(endPartitionOffsets) + + // so we here expect full of records from partition 0 and 1, and only the first record + // from partition 2 which is "2" + + df.option("startingOffsetsByTimestamp", startingTimestamps) + .option("endingOffsets", endingOffsets) + }, topic, (0 to 29).filterNot(_ % 3 == 2) ++ Seq(2)) + } + + test("timestamp provided for ending, offset provided for starting") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // ending only presented as "third", and starting presented as startingOffsets + verifyTimestampRelatedQueryResult({ df => + val startPartitionOffsets = Map( + new TopicPartition(topic, 0) -> -2L, // -2 => earliest + new TopicPartition(topic, 1) -> -2L, + new TopicPartition(topic, 2) -> 0L // explicit earliest + ) + val startingOffsets = JsonUtils.partitionOffsets(startPartitionOffsets) + + val endTopicTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(2)): _*) + val endingTimestamps = JsonUtils.partitionTimestamps(endTopicTimestamps) + + df.option("startingOffsets", startingOffsets) + .option("endingOffsetsByTimestamp", endingTimestamps) + }, topic, 0 to 19) + } + + test("timestamp provided for starting, ending not provided") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // starting only presented as "second", and ending not presented + verifyTimestampRelatedQueryResult({ df => + val startTopicTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(1)): _*) + val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + + df.option("startingOffsetsByTimestamp", startingTimestamps) + }, topic, 10 to 29) + } + + test("timestamp provided for ending, starting not provided") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // ending only presented as "third", and starting not presented + verifyTimestampRelatedQueryResult({ df => + val endTopicTimestamps = Map( + (0 to 2).map(new TopicPartition(topic, _) -> timestamps(2)): _*) + val endingTimestamps = JsonUtils.partitionTimestamps(endTopicTimestamps) + + df.option("endingOffsetsByTimestamp", endingTimestamps) + }, topic, 0 to 19) + } + + test("no matched offset for timestamp - startingOffsets") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + val e = intercept[SparkException] { + verifyTimestampRelatedQueryResult({ df => + // partition 2 will make query fail + val startTopicTimestamps = Map( + (0 to 1).map(new TopicPartition(topic, _) -> timestamps(1)): _*) ++ + Map(new TopicPartition(topic, 2) -> Long.MaxValue) + + val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + + df.option("startingOffsetsByTimestamp", startingTimestamps) + }, topic, Seq.empty) + } + + @tailrec + def assertionErrorInExceptionChain(e: Throwable): Boolean = { + if (e.isInstanceOf[AssertionError]) { + true + } else if (e.getCause == null) { + false + } else { + assertionErrorInExceptionChain(e.getCause) + } + } + + assert(assertionErrorInExceptionChain(e), + "Cannot find expected AssertionError in chained exceptions") + } + + test("no matched offset for timestamp - endingOffsets") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // the query will run fine, since we allow no matching offset for timestamp + // if it's endingOffsets + // for partition 0 and 1, it only takes records between first and second timestamp + // for partition 2, it will take all records + verifyTimestampRelatedQueryResult({ df => + val endTopicTimestamps = Map( + (0 to 1).map(new TopicPartition(topic, _) -> timestamps(1)): _*) ++ + Map(new TopicPartition(topic, 2) -> Long.MaxValue) + + val endingTimestamps = JsonUtils.partitionTimestamps(endTopicTimestamps) + + df.option("endingOffsetsByTimestamp", endingTimestamps) + }, topic, (0 to 9) ++ (10 to 29).filter(_ % 3 == 2)) + } + + private def prepareTimestampRelatedUnitTest: (String, Seq[Long]) = { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + + def sendMessages(topic: String, msgs: Array[String], part: Int, ts: Long): Unit = { + val records = msgs.map { msg => + new RecordBuilder(topic, msg).partition(part).timestamp(ts).build() + } + testUtils.sendMessages(records) + } + + val firstTimestamp = System.currentTimeMillis() - 5000 + (0 to 2).foreach { partNum => + sendMessages(topic, (0 to 9).filter(_ % 3 == partNum) + .map(_.toString).toArray, partNum, firstTimestamp) + } + + val secondTimestamp = firstTimestamp + 1000 + (0 to 2).foreach { partNum => + sendMessages(topic, (10 to 19).filter(_ % 3 == partNum) + .map(_.toString).toArray, partNum, secondTimestamp) + } + + val thirdTimestamp = secondTimestamp + 1000 + (0 to 2).foreach { partNum => + sendMessages(topic, (20 to 29).filter(_ % 3 == partNum) + .map(_.toString).toArray, partNum, thirdTimestamp) + } + + val finalizedTimestamp = thirdTimestamp + 1000 + + (topic, Seq(firstTimestamp, secondTimestamp, thirdTimestamp, finalizedTimestamp)) + } + + private def verifyTimestampRelatedQueryResult( + optionFn: DataFrameReader => DataFrameReader, + topic: String, + expectation: Seq[Int]): Unit = { + val df = spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + + val df2 = optionFn(df).load().selectExpr("CAST(value AS STRING)") + checkAnswer(df2, expectation.map(_.toString).toDF) + } + test("reuse same dataframe in query") { // This test ensures that we do not cache the Kafka Consumer in KafkaRelation val topic = newTopic() diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index f7114129a3cd..fec724bc36d0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -350,57 +350,33 @@ class KafkaTestUtils( } } - /** Java-friendly function for sending messages to the Kafka broker */ - def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { - sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) + def sendMessages(topic: String, msgs: Array[String]): Seq[(String, RecordMetadata)] = { + sendMessages(topic, msgs, None) } - /** Send the messages to the Kafka broker */ - def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { - val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray - sendMessages(topic, messages) - } - - /** Send the array of messages to the Kafka broker */ - def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = { - sendMessages(topic, messages, None) - } - - /** Send the array of messages to the Kafka broker using specified partition */ def sendMessages( topic: String, - messages: Array[String], - partition: Option[Int]): Seq[(String, RecordMetadata)] = { - sendMessages(topic, messages.map(m => (m, Seq())), partition) + msgs: Array[String], + part: Option[Int]): Seq[(String, RecordMetadata)] = { + val records = msgs.map { msg => + val builder = new RecordBuilder(topic, msg) + part.foreach { p => builder.partition(p) } + builder.build() + } + sendMessages(records) } - /** Send record to the Kafka broker with headers using specified partition */ - def sendMessage(topic: String, - record: (String, Seq[(String, Array[Byte])]), - partition: Option[Int]): Seq[(String, RecordMetadata)] = { - sendMessages(topic, Array(record).toSeq, partition) + def sendMessage(msg: ProducerRecord[String, String]): Seq[(String, RecordMetadata)] = { + sendMessages(Array(msg)) } - /** Send the array of records to the Kafka broker with headers using specified partition */ - def sendMessages(topic: String, - records: Seq[(String, Seq[(String, Array[Byte])])], - partition: Option[Int]): Seq[(String, RecordMetadata)] = { + def sendMessages(msgs: Seq[ProducerRecord[String, String]]): Seq[(String, RecordMetadata)] = { producer = new KafkaProducer[String, String](producerConfiguration) val offsets = try { - records.map { case (value, header) => - val headers = header.map { case (k, v) => - new RecordHeader(k, v).asInstanceOf[Header] - } - val record = partition match { - case Some(p) => - new ProducerRecord[String, String](topic, p, null, value, headers.asJava) - case None => - new ProducerRecord[String, String](topic, null, null, value, headers.asJava) - } - val metadata = producer.send(record).get(10, TimeUnit.SECONDS) - logInfo(s"\tSent ($value, $header) to partition ${metadata.partition}," + - " offset ${metadata.offset}") - (value, metadata) + msgs.map { msg => + val metadata = producer.send(msg).get(10, TimeUnit.SECONDS) + logInfo(s"\tSent ($msg) to partition ${metadata.partition}, offset ${metadata.offset}") + (msg.value(), metadata) } } finally { if (producer != null) { @@ -655,4 +631,3 @@ class KafkaTestUtils( } } } - diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/RecordBuilder.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/RecordBuilder.scala new file mode 100644 index 000000000000..ef07798442e5 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/RecordBuilder.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.lang.{Integer => JInt, Long => JLong} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.RecordHeader + +class RecordBuilder(topic: String, value: String) { + var _partition: Option[JInt] = None + var _timestamp: Option[JLong] = None + var _key: Option[String] = None + var _headers: Option[Seq[(String, Array[Byte])]] = None + + def partition(part: JInt): RecordBuilder = { + _partition = Some(part) + this + } + + def partition(part: Int): RecordBuilder = { + _partition = Some(part.intValue()) + this + } + + def timestamp(ts: JLong): RecordBuilder = { + _timestamp = Some(ts) + this + } + + def timestamp(ts: Long): RecordBuilder = { + _timestamp = Some(ts.longValue()) + this + } + + def key(k: String): RecordBuilder = { + _key = Some(k) + this + } + + def headers(hdrs: Seq[(String, Array[Byte])]): RecordBuilder = { + _headers = Some(hdrs) + this + } + + def build(): ProducerRecord[String, String] = { + val part = _partition.orNull + val ts = _timestamp.orNull + val k = _key.orNull + val hdrs = _headers.map { h => + h.map { case (k, v) => new RecordHeader(k, v).asInstanceOf[Header] } + }.map(_.asJava).orNull + + new ProducerRecord[String, String](topic, part, ts, k, value, hdrs) + } +} |