From a1e98ab62dd8f28ec90b8d4ffb1d5fbabaed5540 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 24 Sep 2019 15:22:17 +0900 Subject: [PATCH 1/2] [SPARK-29223][SQL][SS] Enable global timestamp per topic while specifying offset by timestamp in Kafka source --- .../structured-streaming-kafka-integration.md | 16 +- .../sql/kafka010/KafkaOffsetReader.scala | 172 +++++++++++------- .../kafka010/KafkaMicroBatchSourceSuite.scala | 164 +++++++++++++---- .../sql/kafka010/KafkaRelationSuite.scala | 61 +++++-- 4 files changed, 302 insertions(+), 111 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 89732d309aa2..eb46c97c1c25 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -365,15 +365,16 @@ The following configurations are optional: startingOffsetsByTimestamp json string - """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ + """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ or """ {"topicA":{"-1": 1000}, "topicB": {"-1": 2000}} """ - none (the value of startingOffsets will apply) + none (the value of startingOffsets will apply) streaming and batch The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)

-

+ You can specify global timestamp per topic which applies to all partitions in topic, via setting partition to "-1". + Note that the option is mutually exclusive, you can't specify timestamp for both global and specific partition(s).

Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

@@ -401,15 +402,16 @@ The following configurations are optional: endingOffsetsByTimestamp json string - """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ + """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ or """ {"topicA":{"-1": 1000}, "topicB": {"-1": 2000} """ - latest + the value of endingOffsets will apply batch query - The end point when a batch query is ended, a json string specifying an ending timesamp for each TopicPartition. + The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will be set to latest.

-

+ You can specify global timestamp per topic which applies to all partitions in topic, via setting partition to "-1". + Note that the option is mutually exclusive, you can't specify timestamp for both global and specific partition(s).

Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.

For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.

Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 0179f4dd822f..820b44eb7728 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -21,7 +21,7 @@ import java.{util => ju} import java.util.concurrent.Executors import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.util.control.NonFatal @@ -49,6 +49,9 @@ private[kafka010] class KafkaOffsetReader( val driverKafkaParams: ju.Map[String, Object], readerOptions: CaseInsensitiveMap[String], driverGroupIdPrefix: String) extends Logging { + + import KafkaOffsetReader._ + /** * Used to ensure execute fetch operations execute in an UninterruptibleThread */ @@ -166,83 +169,118 @@ private[kafka010] class KafkaOffsetReader( def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset = { - val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => - assert(partitions.asScala == partitionOffsets.keySet, - "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest, if you don't care.\n" + - s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + val fnRetrievePartitionOffsets: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { + case (newParams, _) => newParams } - val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => - partitionOffsets - } + fetchSpecificOffsets0(partitionOffsets, adjustParamsWithPartitionsForOffsets, + fnRetrievePartitionOffsets, assertFetchedOffsetsForOffsets(reportDataLoss)) + } - val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => - partitionOffsets.foreach { - case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && - off != KafkaOffsetRangeLimit.EARLIEST => - if (fetched(tp) != off) { - reportDataLoss( - s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") - } - case _ => - // no real way to check that beginning or end is reasonable - } - } + private def adjustParamsWithPartitionsForOffsets + : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (params, partitions) => + assert(partitions.asScala == params.keySet, + "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${params.keySet} Assigned: ${partitions.asScala}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $params") + params + } - fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, - fnAssertFetchedOffsets) + private def assertFetchedOffsetsForOffsets(reportDataLoss: String => Unit) + : (TPToOffsets, TPToOffsets) => Unit = { case (newParams, fetched) => + newParams.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (fetched(tp) != off) { + reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + } + case _ => + // no real way to check that beginning or end is reasonable + } } def fetchSpecificTimestampBasedOffsets( partitionTimestamps: Map[TopicPartition, Long], failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = { - val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => - assert(partitions.asScala == partitionTimestamps.keySet, - "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + - s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps") - } + val fnAssertFetchedOffsets: (TPToOffsets, TPToOffsets) => Unit = { (_, _) => } - val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => { - val converted = partitionTimestamps.map { case (tp, timestamp) => - tp -> java.lang.Long.valueOf(timestamp) - }.asJava + fetchSpecificOffsets0(partitionTimestamps, + adjustParamsWithPartitionsForTimestampBasedOffset, + retrievePartitionOffsetsForTimestampBasedOffset(failsOnNoMatchingOffset), + fnAssertFetchedOffsets) + } - val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = - consumer.offsetsForTimes(converted) + private def adjustParamsWithPartitionsForTimestampBasedOffset + : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (params, partitions) => + val paramsGroupedByTopic = params.groupBy(_._1.topic()) + val partitionsGroupedByTopic = partitions.asScala.groupBy(_.topic()) + + assert(paramsGroupedByTopic.keySet == partitionsGroupedByTopic.keySet, + s"Not all specified topics are assigned. Specified: ${params.keySet} " + + s"Assigned: ${partitions.asScala}") + + val newParams: Map[TopicPartition, Long] = paramsGroupedByTopic.map { + case (topic, tpToOffset) => + if (tpToOffset.keySet.map(_.partition()).contains(GLOBAL_PARTITION_NUM)) { + // global timestamp has been set for all partitions in topic + + // we disallow overriding timestamp per partition for simplicity + assert(tpToOffset.size == 1, "Global timestamp for topic cannot be set along with " + + "specifying partition(s). Specify only global timestamp for each topic if you want " + + s"to use global timestamp. Configuration error on topic $topic, specified: $params") + + val partsForTopic = partitionsGroupedByTopic(topic) + val timestampOffset = tpToOffset(new TopicPartition(topic, GLOBAL_PARTITION_NUM)) + + partsForTopic.map { part => part -> timestampOffset }.toMap + } else { + val partsForTopic = partitionsGroupedByTopic(topic) + assert(tpToOffset.keySet == partsForTopic.toSet, + "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify" + + s" all TopicPartitions. Specified: ${params.keySet} Assigned: " + + s"${partitions.asScala}") + tpToOffset + } + }.reduceLeft { (acc: TPToOffsets, tpToOffsets: TPToOffsets) => acc ++ tpToOffsets } - offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => - if (failsOnNoMatchingOffset) { - assert(offsetAndTimestamp != null, "No offset matched from request of " + - s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.") - } + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $newParams") + newParams + } - if (offsetAndTimestamp == null) { - tp -> KafkaOffsetRangeLimit.LATEST - } else { - tp -> offsetAndTimestamp.offset() - } - }.toMap - } - } + private def retrievePartitionOffsetsForTimestampBasedOffset(failsOnNoMatchingOffset: Boolean) + : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (newPartitionTimestamps, _) => + val converted = newPartitionTimestamps.map { case (tp, timestamp) => + tp -> java.lang.Long.valueOf(timestamp) + }.asJava - val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => } + val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = + consumer.offsetsForTimes(converted) - fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, - fnAssertFetchedOffsets) + offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => + if (failsOnNoMatchingOffset) { + assert(offsetAndTimestamp != null, "No offset matched from request of " + + s"topic-partition $tp and timestamp ${newPartitionTimestamps(tp)}.") + } + + if (offsetAndTimestamp == null) { + tp -> KafkaOffsetRangeLimit.LATEST + } else { + tp -> offsetAndTimestamp.offset() + } + }.toMap } private def fetchSpecificOffsets0( - fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit, - fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long], - fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit): KafkaSourceOffset = { + partitionTimestamps: TPToOffsets, + fnAdjustParamsWithPartitions: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets, + fnRetrievePartitionOffsets: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets, + fnAssertFetchedOffsets: (TPToOffsets, TPToOffsets) => Unit): KafkaSourceOffset = { val fetched = partitionsAssignedToConsumer { partitions => { - fnAssertParametersWithPartitions(partitions) - - val partitionOffsets = fnRetrievePartitionOffsets(partitions) + val newPartTimestamps = fnAdjustParamsWithPartitions(partitionTimestamps, partitions) + val partitionOffsets = fnRetrievePartitionOffsets(newPartTimestamps, partitions) partitionOffsets.foreach { case (tp, KafkaOffsetRangeLimit.LATEST) => @@ -252,14 +290,16 @@ private[kafka010] class KafkaOffsetReader( case (tp, off) => consumer.seek(tp, off) } - partitionOffsets.map { + val fetchedOffsets = partitionOffsets.map { case (tp, _) => tp -> consumer.position(tp) } + + fnAssertFetchedOffsets(newPartTimestamps, fetchedOffsets) + + fetchedOffsets } } - fnAssertFetchedOffsets(fetched) - KafkaSourceOffset(fetched) } @@ -307,7 +347,7 @@ private[kafka010] class KafkaOffsetReader( * latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`). */ def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = { - var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]() + var incorrectOffsets = mutable.ArrayBuffer[(TopicPartition, Long, Long)]() partitionOffsets.foreach { case (tp, offset) => knownOffsets.foreach(_.get(tp).foreach { knownOffset => if (knownOffset > offset) { @@ -478,3 +518,11 @@ private[kafka010] class KafkaOffsetReader( _consumer = null // will automatically get reinitialized again } } + +private[kafka010] object KafkaOffsetReader { + // The type is to shorten the type to reduce characters in line: please avoid exposing + // this type to outside of KafkaOffsetReader - parameter/return type of public methods + private type TPToOffsets = Map[TopicPartition, Long] + + val GLOBAL_PARTITION_NUM = -1 +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 26136203b09a..1f0384d612da 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1288,6 +1288,16 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { "failOnDataLoss" -> failOnDataLoss.toString) } + test(s"assign from global timestamp per topic (failOnDataLoss: $failOnDataLoss)") { + val topic = newTopic() + testFromGlobalTimestamp( + topic, + failOnDataLoss = failOnDataLoss, + addPartitions = false, + "assign" -> assignString(topic, 0 to 4), + "failOnDataLoss" -> failOnDataLoss.toString) + } + test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") { val topic = newTopic() testFromLatestOffsets( @@ -1317,6 +1327,13 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { "subscribe" -> topic) } + test(s"subscribing topic by name from global timestamp per topic" + + s" (failOnDataLoss: $failOnDataLoss)") { + val topic = newTopic() + testFromGlobalTimestamp(topic, failOnDataLoss = failOnDataLoss, addPartitions = true, + "subscribe" -> topic) + } + test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") { val topicPrefix = newTopic() val topic = topicPrefix + "-suffix" @@ -1356,6 +1373,17 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { addPartitions = true, "subscribePattern" -> s"$topicPrefix-.*") } + + test(s"subscribing topic by pattern from global timestamp per topic " + + s"(failOnDataLoss: $failOnDataLoss)") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromGlobalTimestamp( + topic, + failOnDataLoss = failOnDataLoss, + addPartitions = true, + "subscribePattern" -> s"$topicPrefix-.*") + } } test("bad source options") { @@ -1505,28 +1533,11 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { failOnDataLoss: Boolean, addPartitions: Boolean, options: (String, String)*): Unit = { - def sendMessages(topic: String, msgs: Seq[String], part: Int, ts: Long): Unit = { - val records = msgs.map { msg => - new RecordBuilder(topic, msg).partition(part).timestamp(ts).build() - } - testUtils.sendMessages(records) - } - testUtils.createTopic(topic, partitions = 5) val firstTimestamp = System.currentTimeMillis() - 5000 - sendMessages(topic, Array(-20).map(_.toString), 0, firstTimestamp) - sendMessages(topic, Array(-10).map(_.toString), 1, firstTimestamp) - sendMessages(topic, Array(0, 1).map(_.toString), 2, firstTimestamp) - sendMessages(topic, Array(10, 11).map(_.toString), 3, firstTimestamp) - sendMessages(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp) - val secondTimestamp = firstTimestamp + 1000 - sendMessages(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp) - sendMessages(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp) - sendMessages(topic, Array(2).map(_.toString), 2, secondTimestamp) - sendMessages(topic, Array(12).map(_.toString), 3, secondTimestamp) - // no data after second timestamp for partition 4 + setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp) require(testUtils.getLatestOffsets(Set(topic)).size === 5) @@ -1537,19 +1548,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { ) ++ Map(new TopicPartition(topic, 4) -> firstTimestamp) val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps) - val reader = spark - .readStream - .format("kafka") - .option("startingOffsetsByTimestamp", startingTimestamps) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("failOnDataLoss", failOnDataLoss.toString) - options.foreach { case (k, v) => reader.option(k, v) } - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - + val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss, + options: _*) testStream(mapped)( makeSureGetOffsetCalled, Execute { q => @@ -1576,6 +1576,108 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { ) } + private def testFromGlobalTimestamp( + topic: String, + failOnDataLoss: Boolean, + addPartitions: Boolean, + options: (String, String)*): Unit = { + testUtils.createTopic(topic, partitions = 5) + + val firstTimestamp = System.currentTimeMillis() - 5000 + val secondTimestamp = firstTimestamp + 1000 + setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp) + // here we should add records in partition 4 which match with second timestamp + // as the query will break if there's no matching records + sendMessagesWithTimestamp(topic, Array(23, 24).map(_.toString), 4, secondTimestamp) + + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + // we intentionally starts from second timestamp for all partitions + // via setting global partition + val startPartitionTimestamps: Map[TopicPartition, Long] = Map( + new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> secondTimestamp) + val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps) + + val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss, + options: _*) + testStream(mapped)( + makeSureGetOffsetCalled, + Execute { q => + // wait to reach the last offset in every partition + val partAndOffsets = (0 to 4).map(new TopicPartition(topic, _)).map { tp => + if (tp.partition() < 4) { + tp -> 3L + } else { + tp -> 5L // we added 2 more records to partition 4 + } + }.toMap + q.awaitOffset(0, KafkaSourceOffset(partAndOffsets), streamingTimeout.toMillis) + }, + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24), + StopStream, + StartStream(), + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24), // Should get the data back on recovery + StopStream, + AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped + StartStream(), + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32), // Should get the added data + AssertOnQuery("Add partitions") { query: StreamExecution => + if (addPartitions) setTopicPartitions(topic, 10, query) + true + }, + AddKafkaData(Set(topic), 40, 41, 42, 43, 44)(ensureDataInMultiplePartition = true), + CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32, 40, 41, 42, 43, 44), + StopStream + ) + } + + private def sendMessagesWithTimestamp( + topic: String, + msgs: Seq[String], + part: Int, + ts: Long): Unit = { + val records = msgs.map { msg => + new RecordBuilder(topic, msg).partition(part).timestamp(ts).build() + } + testUtils.sendMessages(records) + } + + private def setupTestMessagesForTestOnTimestampOffsets( + topic: String, + firstTimestamp: Long, + secondTimestamp: Long): Unit = { + sendMessagesWithTimestamp(topic, Array(-20).map(_.toString), 0, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(-10).map(_.toString), 1, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(0, 1).map(_.toString), 2, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(10, 11).map(_.toString), 3, firstTimestamp) + sendMessagesWithTimestamp(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp) + + sendMessagesWithTimestamp(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp) + sendMessagesWithTimestamp(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp) + sendMessagesWithTimestamp(topic, Array(2).map(_.toString), 2, secondTimestamp) + sendMessagesWithTimestamp(topic, Array(12).map(_.toString), 3, secondTimestamp) + // no data after second timestamp for partition 4 + } + + private def setupDataFrameForTestOnTimestampOffsets( + startingTimestamps: String, + failOnDataLoss: Boolean, + options: (String, String)*): Dataset[_] = { + val reader = spark + .readStream + .format("kafka") + .option("startingOffsetsByTimestamp", startingTimestamps) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("failOnDataLoss", failOnDataLoss.toString) + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + mapped + } + test("Kafka column types") { val now = System.currentTimeMillis() val topic = newTopic() diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 556eab4b5638..dd80a276a553 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -268,6 +268,24 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession }, topic, 0 to 19) } + test("global timestamp provided for starting and ending") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + // timestamp both presented: starting "first" ending "finalized" + verifyTimestampRelatedQueryResult({ df => + val startPartitionTimestamps: Map[TopicPartition, Long] = Map( + new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> timestamps(1)) + val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps) + + val endPartitionTimestamps = Map( + new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> timestamps(2)) + val endingTimestamps = JsonUtils.partitionTimestamps(endPartitionTimestamps) + + df.option("startingOffsetsByTimestamp", startingTimestamps) + .option("endingOffsetsByTimestamp", endingTimestamps) + }, topic, 10 to 19) + } + test("no matched offset for timestamp - startingOffsets") { val (topic, timestamps) = prepareTimestampRelatedUnitTest @@ -284,19 +302,40 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession }, topic, Seq.empty) } - @tailrec - def assertionErrorInExceptionChain(e: Throwable): Boolean = { - if (e.isInstanceOf[AssertionError]) { - true - } else if (e.getCause == null) { - false - } else { - assertionErrorInExceptionChain(e.getCause) - } + assert(findAssertionErrorInExceptionChain(e).isDefined, + "Cannot find expected AssertionError in chained exceptions") + } + + test("specifying both global timestamp and specific timestamp for partition") { + val (topic, timestamps) = prepareTimestampRelatedUnitTest + + val e = intercept[SparkException] { + verifyTimestampRelatedQueryResult({ df => + val startTopicTimestamps = Map( + new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> timestamps(1), + new TopicPartition(topic, 1) -> timestamps(2) + ) + val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps) + + df.option("startingOffsetsByTimestamp", startingTimestamps) + }, topic, Seq.empty) } - assert(assertionErrorInExceptionChain(e), - "Cannot find expected AssertionError in chained exceptions") + val actualError = findAssertionErrorInExceptionChain(e) + assert(actualError.isDefined, "Cannot find expected AssertionError in chained exceptions") + assert(actualError.get.getMessage.contains( + "Global timestamp for topic cannot be set along with specifying partition")) + } + + @tailrec + private def findAssertionErrorInExceptionChain(e: Throwable): Option[Throwable] = { + if (e.isInstanceOf[AssertionError]) { + Some(e) + } else if (e.getCause == null) { + None + } else { + findAssertionErrorInExceptionChain(e.getCause) + } } test("no matched offset for timestamp - endingOffsets") { From 70454134af555d934cd78ee48acb54736cf24a77 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 24 Sep 2019 16:33:52 +0900 Subject: [PATCH 2/2] Roll back unrelated change --- .../org/apache/spark/sql/kafka010/KafkaOffsetReader.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 820b44eb7728..387211d403b0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -21,7 +21,7 @@ import java.{util => ju} import java.util.concurrent.Executors import scala.collection.JavaConverters._ -import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.util.control.NonFatal @@ -347,7 +347,7 @@ private[kafka010] class KafkaOffsetReader( * latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`). */ def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = { - var incorrectOffsets = mutable.ArrayBuffer[(TopicPartition, Long, Long)]() + var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]() partitionOffsets.foreach { case (tp, offset) => knownOffsets.foreach(_.get(tp).foreach { knownOffset => if (knownOffset > offset) {