diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 89732d309aa2..eb46c97c1c25 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -365,15 +365,16 @@ The following configurations are optional:
startingOffsets will apply)startingOffsets will apply)KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.
For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.
Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.
@@ -401,15 +402,16 @@ The following configurations are optional:
endingOffsets will applyKafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.
For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.
Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 0179f4dd822f..387211d403b0 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -49,6 +49,9 @@ private[kafka010] class KafkaOffsetReader(
val driverKafkaParams: ju.Map[String, Object],
readerOptions: CaseInsensitiveMap[String],
driverGroupIdPrefix: String) extends Logging {
+
+ import KafkaOffsetReader._
+
/**
* Used to ensure execute fetch operations execute in an UninterruptibleThread
*/
@@ -166,83 +169,118 @@ private[kafka010] class KafkaOffsetReader(
def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
reportDataLoss: String => Unit): KafkaSourceOffset = {
- val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
- assert(partitions.asScala == partitionOffsets.keySet,
- "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
- "Use -1 for latest, -2 for earliest, if you don't care.\n" +
- s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
- logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
+ val fnRetrievePartitionOffsets: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = {
+ case (newParams, _) => newParams
}
- val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ =>
- partitionOffsets
- }
+ fetchSpecificOffsets0(partitionOffsets, adjustParamsWithPartitionsForOffsets,
+ fnRetrievePartitionOffsets, assertFetchedOffsetsForOffsets(reportDataLoss))
+ }
- val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched =>
- partitionOffsets.foreach {
- case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
- off != KafkaOffsetRangeLimit.EARLIEST =>
- if (fetched(tp) != off) {
- reportDataLoss(
- s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
- }
- case _ =>
- // no real way to check that beginning or end is reasonable
- }
- }
+ private def adjustParamsWithPartitionsForOffsets
+ : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (params, partitions) =>
+ assert(partitions.asScala == params.keySet,
+ "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
+ "Use -1 for latest, -2 for earliest, if you don't care.\n" +
+ s"Specified: ${params.keySet} Assigned: ${partitions.asScala}")
+ logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $params")
+ params
+ }
- fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets,
- fnAssertFetchedOffsets)
+ private def assertFetchedOffsetsForOffsets(reportDataLoss: String => Unit)
+ : (TPToOffsets, TPToOffsets) => Unit = { case (newParams, fetched) =>
+ newParams.foreach {
+ case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+ off != KafkaOffsetRangeLimit.EARLIEST =>
+ if (fetched(tp) != off) {
+ reportDataLoss(
+ s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
+ }
+ case _ =>
+ // no real way to check that beginning or end is reasonable
+ }
}
def fetchSpecificTimestampBasedOffsets(
partitionTimestamps: Map[TopicPartition, Long],
failsOnNoMatchingOffset: Boolean): KafkaSourceOffset = {
- val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
- assert(partitions.asScala == partitionTimestamps.keySet,
- "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " +
- s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}")
- logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps")
- }
+ val fnAssertFetchedOffsets: (TPToOffsets, TPToOffsets) => Unit = { (_, _) => }
- val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => {
- val converted = partitionTimestamps.map { case (tp, timestamp) =>
- tp -> java.lang.Long.valueOf(timestamp)
- }.asJava
+ fetchSpecificOffsets0(partitionTimestamps,
+ adjustParamsWithPartitionsForTimestampBasedOffset,
+ retrievePartitionOffsetsForTimestampBasedOffset(failsOnNoMatchingOffset),
+ fnAssertFetchedOffsets)
+ }
- val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
- consumer.offsetsForTimes(converted)
+ private def adjustParamsWithPartitionsForTimestampBasedOffset
+ : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (params, partitions) =>
+ val paramsGroupedByTopic = params.groupBy(_._1.topic())
+ val partitionsGroupedByTopic = partitions.asScala.groupBy(_.topic())
+
+ assert(paramsGroupedByTopic.keySet == partitionsGroupedByTopic.keySet,
+ s"Not all specified topics are assigned. Specified: ${params.keySet} " +
+ s"Assigned: ${partitions.asScala}")
+
+ val newParams: Map[TopicPartition, Long] = paramsGroupedByTopic.map {
+ case (topic, tpToOffset) =>
+ if (tpToOffset.keySet.map(_.partition()).contains(GLOBAL_PARTITION_NUM)) {
+ // global timestamp has been set for all partitions in topic
+
+ // we disallow overriding timestamp per partition for simplicity
+ assert(tpToOffset.size == 1, "Global timestamp for topic cannot be set along with " +
+ "specifying partition(s). Specify only global timestamp for each topic if you want " +
+ s"to use global timestamp. Configuration error on topic $topic, specified: $params")
+
+ val partsForTopic = partitionsGroupedByTopic(topic)
+ val timestampOffset = tpToOffset(new TopicPartition(topic, GLOBAL_PARTITION_NUM))
+
+ partsForTopic.map { part => part -> timestampOffset }.toMap
+ } else {
+ val partsForTopic = partitionsGroupedByTopic(topic)
+ assert(tpToOffset.keySet == partsForTopic.toSet,
+ "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify" +
+ s" all TopicPartitions. Specified: ${params.keySet} Assigned: " +
+ s"${partitions.asScala}")
+ tpToOffset
+ }
+ }.reduceLeft { (acc: TPToOffsets, tpToOffsets: TPToOffsets) => acc ++ tpToOffsets }
- offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
- if (failsOnNoMatchingOffset) {
- assert(offsetAndTimestamp != null, "No offset matched from request of " +
- s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.")
- }
+ logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $newParams")
+ newParams
+ }
- if (offsetAndTimestamp == null) {
- tp -> KafkaOffsetRangeLimit.LATEST
- } else {
- tp -> offsetAndTimestamp.offset()
- }
- }.toMap
- }
- }
+ private def retrievePartitionOffsetsForTimestampBasedOffset(failsOnNoMatchingOffset: Boolean)
+ : (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets = { case (newPartitionTimestamps, _) =>
+ val converted = newPartitionTimestamps.map { case (tp, timestamp) =>
+ tp -> java.lang.Long.valueOf(timestamp)
+ }.asJava
- val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => }
+ val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
+ consumer.offsetsForTimes(converted)
- fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets,
- fnAssertFetchedOffsets)
+ offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
+ if (failsOnNoMatchingOffset) {
+ assert(offsetAndTimestamp != null, "No offset matched from request of " +
+ s"topic-partition $tp and timestamp ${newPartitionTimestamps(tp)}.")
+ }
+
+ if (offsetAndTimestamp == null) {
+ tp -> KafkaOffsetRangeLimit.LATEST
+ } else {
+ tp -> offsetAndTimestamp.offset()
+ }
+ }.toMap
}
private def fetchSpecificOffsets0(
- fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit,
- fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long],
- fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit): KafkaSourceOffset = {
+ partitionTimestamps: TPToOffsets,
+ fnAdjustParamsWithPartitions: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets,
+ fnRetrievePartitionOffsets: (TPToOffsets, ju.Set[TopicPartition]) => TPToOffsets,
+ fnAssertFetchedOffsets: (TPToOffsets, TPToOffsets) => Unit): KafkaSourceOffset = {
val fetched = partitionsAssignedToConsumer {
partitions => {
- fnAssertParametersWithPartitions(partitions)
-
- val partitionOffsets = fnRetrievePartitionOffsets(partitions)
+ val newPartTimestamps = fnAdjustParamsWithPartitions(partitionTimestamps, partitions)
+ val partitionOffsets = fnRetrievePartitionOffsets(newPartTimestamps, partitions)
partitionOffsets.foreach {
case (tp, KafkaOffsetRangeLimit.LATEST) =>
@@ -252,14 +290,16 @@ private[kafka010] class KafkaOffsetReader(
case (tp, off) => consumer.seek(tp, off)
}
- partitionOffsets.map {
+ val fetchedOffsets = partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
}
+
+ fnAssertFetchedOffsets(newPartTimestamps, fetchedOffsets)
+
+ fetchedOffsets
}
}
- fnAssertFetchedOffsets(fetched)
-
KafkaSourceOffset(fetched)
}
@@ -478,3 +518,11 @@ private[kafka010] class KafkaOffsetReader(
_consumer = null // will automatically get reinitialized again
}
}
+
+private[kafka010] object KafkaOffsetReader {
+ // The type is to shorten the type to reduce characters in line: please avoid exposing
+ // this type to outside of KafkaOffsetReader - parameter/return type of public methods
+ private type TPToOffsets = Map[TopicPartition, Long]
+
+ val GLOBAL_PARTITION_NUM = -1
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 26136203b09a..1f0384d612da 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1288,6 +1288,16 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
"failOnDataLoss" -> failOnDataLoss.toString)
}
+ test(s"assign from global timestamp per topic (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromGlobalTimestamp(
+ topic,
+ failOnDataLoss = failOnDataLoss,
+ addPartitions = false,
+ "assign" -> assignString(topic, 0 to 4),
+ "failOnDataLoss" -> failOnDataLoss.toString)
+ }
+
test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic()
testFromLatestOffsets(
@@ -1317,6 +1327,13 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
"subscribe" -> topic)
}
+ test(s"subscribing topic by name from global timestamp per topic" +
+ s" (failOnDataLoss: $failOnDataLoss)") {
+ val topic = newTopic()
+ testFromGlobalTimestamp(topic, failOnDataLoss = failOnDataLoss, addPartitions = true,
+ "subscribe" -> topic)
+ }
+
test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
@@ -1356,6 +1373,17 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
addPartitions = true,
"subscribePattern" -> s"$topicPrefix-.*")
}
+
+ test(s"subscribing topic by pattern from global timestamp per topic " +
+ s"(failOnDataLoss: $failOnDataLoss)") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromGlobalTimestamp(
+ topic,
+ failOnDataLoss = failOnDataLoss,
+ addPartitions = true,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
}
test("bad source options") {
@@ -1505,28 +1533,11 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
failOnDataLoss: Boolean,
addPartitions: Boolean,
options: (String, String)*): Unit = {
- def sendMessages(topic: String, msgs: Seq[String], part: Int, ts: Long): Unit = {
- val records = msgs.map { msg =>
- new RecordBuilder(topic, msg).partition(part).timestamp(ts).build()
- }
- testUtils.sendMessages(records)
- }
-
testUtils.createTopic(topic, partitions = 5)
val firstTimestamp = System.currentTimeMillis() - 5000
- sendMessages(topic, Array(-20).map(_.toString), 0, firstTimestamp)
- sendMessages(topic, Array(-10).map(_.toString), 1, firstTimestamp)
- sendMessages(topic, Array(0, 1).map(_.toString), 2, firstTimestamp)
- sendMessages(topic, Array(10, 11).map(_.toString), 3, firstTimestamp)
- sendMessages(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp)
-
val secondTimestamp = firstTimestamp + 1000
- sendMessages(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp)
- sendMessages(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp)
- sendMessages(topic, Array(2).map(_.toString), 2, secondTimestamp)
- sendMessages(topic, Array(12).map(_.toString), 3, secondTimestamp)
- // no data after second timestamp for partition 4
+ setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
@@ -1537,19 +1548,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
) ++ Map(new TopicPartition(topic, 4) -> firstTimestamp)
val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps)
- val reader = spark
- .readStream
- .format("kafka")
- .option("startingOffsetsByTimestamp", startingTimestamps)
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("failOnDataLoss", failOnDataLoss.toString)
- options.foreach { case (k, v) => reader.option(k, v) }
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
+ val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss,
+ options: _*)
testStream(mapped)(
makeSureGetOffsetCalled,
Execute { q =>
@@ -1576,6 +1576,108 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
)
}
+ private def testFromGlobalTimestamp(
+ topic: String,
+ failOnDataLoss: Boolean,
+ addPartitions: Boolean,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+
+ val firstTimestamp = System.currentTimeMillis() - 5000
+ val secondTimestamp = firstTimestamp + 1000
+ setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, secondTimestamp)
+ // here we should add records in partition 4 which match with second timestamp
+ // as the query will break if there's no matching records
+ sendMessagesWithTimestamp(topic, Array(23, 24).map(_.toString), 4, secondTimestamp)
+
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ // we intentionally starts from second timestamp for all partitions
+ // via setting global partition
+ val startPartitionTimestamps: Map[TopicPartition, Long] = Map(
+ new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> secondTimestamp)
+ val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps)
+
+ val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, failOnDataLoss,
+ options: _*)
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ Execute { q =>
+ // wait to reach the last offset in every partition
+ val partAndOffsets = (0 to 4).map(new TopicPartition(topic, _)).map { tp =>
+ if (tp.partition() < 4) {
+ tp -> 3L
+ } else {
+ tp -> 5L // we added 2 more records to partition 4
+ }
+ }.toMap
+ q.awaitOffset(0, KafkaSourceOffset(partAndOffsets), streamingTimeout.toMillis)
+ },
+ CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24),
+ StopStream,
+ StartStream(),
+ CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24), // Should get the data back on recovery
+ StopStream,
+ AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped
+ StartStream(),
+ CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32), // Should get the added data
+ AssertOnQuery("Add partitions") { query: StreamExecution =>
+ if (addPartitions) setTopicPartitions(topic, 10, query)
+ true
+ },
+ AddKafkaData(Set(topic), 40, 41, 42, 43, 44)(ensureDataInMultiplePartition = true),
+ CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32, 40, 41, 42, 43, 44),
+ StopStream
+ )
+ }
+
+ private def sendMessagesWithTimestamp(
+ topic: String,
+ msgs: Seq[String],
+ part: Int,
+ ts: Long): Unit = {
+ val records = msgs.map { msg =>
+ new RecordBuilder(topic, msg).partition(part).timestamp(ts).build()
+ }
+ testUtils.sendMessages(records)
+ }
+
+ private def setupTestMessagesForTestOnTimestampOffsets(
+ topic: String,
+ firstTimestamp: Long,
+ secondTimestamp: Long): Unit = {
+ sendMessagesWithTimestamp(topic, Array(-20).map(_.toString), 0, firstTimestamp)
+ sendMessagesWithTimestamp(topic, Array(-10).map(_.toString), 1, firstTimestamp)
+ sendMessagesWithTimestamp(topic, Array(0, 1).map(_.toString), 2, firstTimestamp)
+ sendMessagesWithTimestamp(topic, Array(10, 11).map(_.toString), 3, firstTimestamp)
+ sendMessagesWithTimestamp(topic, Array(20, 21, 22).map(_.toString), 4, firstTimestamp)
+
+ sendMessagesWithTimestamp(topic, Array(-21, -22).map(_.toString), 0, secondTimestamp)
+ sendMessagesWithTimestamp(topic, Array(-11, -12).map(_.toString), 1, secondTimestamp)
+ sendMessagesWithTimestamp(topic, Array(2).map(_.toString), 2, secondTimestamp)
+ sendMessagesWithTimestamp(topic, Array(12).map(_.toString), 3, secondTimestamp)
+ // no data after second timestamp for partition 4
+ }
+
+ private def setupDataFrameForTestOnTimestampOffsets(
+ startingTimestamps: String,
+ failOnDataLoss: Boolean,
+ options: (String, String)*): Dataset[_] = {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("startingOffsetsByTimestamp", startingTimestamps)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("failOnDataLoss", failOnDataLoss.toString)
+ options.foreach { case (k, v) => reader.option(k, v) }
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+ mapped
+ }
+
test("Kafka column types") {
val now = System.currentTimeMillis()
val topic = newTopic()
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 556eab4b5638..dd80a276a553 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -268,6 +268,24 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
}, topic, 0 to 19)
}
+ test("global timestamp provided for starting and ending") {
+ val (topic, timestamps) = prepareTimestampRelatedUnitTest
+
+ // timestamp both presented: starting "first" ending "finalized"
+ verifyTimestampRelatedQueryResult({ df =>
+ val startPartitionTimestamps: Map[TopicPartition, Long] = Map(
+ new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> timestamps(1))
+ val startingTimestamps = JsonUtils.partitionTimestamps(startPartitionTimestamps)
+
+ val endPartitionTimestamps = Map(
+ new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> timestamps(2))
+ val endingTimestamps = JsonUtils.partitionTimestamps(endPartitionTimestamps)
+
+ df.option("startingOffsetsByTimestamp", startingTimestamps)
+ .option("endingOffsetsByTimestamp", endingTimestamps)
+ }, topic, 10 to 19)
+ }
+
test("no matched offset for timestamp - startingOffsets") {
val (topic, timestamps) = prepareTimestampRelatedUnitTest
@@ -284,19 +302,40 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
}, topic, Seq.empty)
}
- @tailrec
- def assertionErrorInExceptionChain(e: Throwable): Boolean = {
- if (e.isInstanceOf[AssertionError]) {
- true
- } else if (e.getCause == null) {
- false
- } else {
- assertionErrorInExceptionChain(e.getCause)
- }
+ assert(findAssertionErrorInExceptionChain(e).isDefined,
+ "Cannot find expected AssertionError in chained exceptions")
+ }
+
+ test("specifying both global timestamp and specific timestamp for partition") {
+ val (topic, timestamps) = prepareTimestampRelatedUnitTest
+
+ val e = intercept[SparkException] {
+ verifyTimestampRelatedQueryResult({ df =>
+ val startTopicTimestamps = Map(
+ new TopicPartition(topic, KafkaOffsetReader.GLOBAL_PARTITION_NUM) -> timestamps(1),
+ new TopicPartition(topic, 1) -> timestamps(2)
+ )
+ val startingTimestamps = JsonUtils.partitionTimestamps(startTopicTimestamps)
+
+ df.option("startingOffsetsByTimestamp", startingTimestamps)
+ }, topic, Seq.empty)
}
- assert(assertionErrorInExceptionChain(e),
- "Cannot find expected AssertionError in chained exceptions")
+ val actualError = findAssertionErrorInExceptionChain(e)
+ assert(actualError.isDefined, "Cannot find expected AssertionError in chained exceptions")
+ assert(actualError.get.getMessage.contains(
+ "Global timestamp for topic cannot be set along with specifying partition"))
+ }
+
+ @tailrec
+ private def findAssertionErrorInExceptionChain(e: Throwable): Option[Throwable] = {
+ if (e.isInstanceOf[AssertionError]) {
+ Some(e)
+ } else if (e.getCause == null) {
+ None
+ } else {
+ findAssertionErrorInExceptionChain(e.getCause)
+ }
}
test("no matched offset for timestamp - endingOffsets") {