diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index c64b0706f62b..fd0b10bbcd1e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -122,6 +122,16 @@ private[kafka010] class KafkaOffsetReader( partitions.asScala.toSet } + def fetchOffsetsByTime(times: Map[TopicPartition, Long]): + Map[TopicPartition, Option[Long]] = runUninterruptibly { + assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) + + consumer.offsetsForTimes(times.map{case (k, v) => k -> long2Long(v)}.asJava) + .asScala.map{case (k, v) => + k -> (if (v != null) Some(Long2long(v.offset())) else None) + }.toMap + } + /** * Resolves the specific offsets based on Kafka seek positions. * This method resolves offset value -1 to the latest and -2 to the @@ -394,6 +404,8 @@ private[kafka010] class KafkaOffsetReader( } private[kafka010] object KafkaOffsetReader { + // offsets are not instances of Optional, we need special state for None + val EMPTY_OFFSET: Long = -100L def kafkaSchema: StructType = StructType(Seq( StructField("key", BinaryType), diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index 9effa2959110..34de34729025 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.kafka010 +import java.sql.Timestamp import java.util.UUID +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging @@ -26,11 +28,12 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.sources.{BaseRelation, TableScan} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String + private[kafka010] class KafkaRelation( override val sqlContext: SQLContext, strategy: ConsumerStrategy, @@ -39,7 +42,7 @@ private[kafka010] class KafkaRelation( failOnDataLoss: Boolean, startingOffsets: KafkaOffsetRangeLimit, endingOffsets: KafkaOffsetRangeLimit) - extends BaseRelation with TableScan with Logging { + extends BaseRelation with PrunedFilteredScan with Logging { assert(startingOffsets != LatestOffsetRangeLimit, "Starting offset not allowed to be set to latest offsets.") assert(endingOffsets != EarliestOffsetRangeLimit, @@ -54,7 +57,7 @@ private[kafka010] class KafkaRelation( override def schema: StructType = KafkaOffsetReader.kafkaSchema - override def buildScan(): RDD[Row] = { + def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. @@ -67,35 +70,10 @@ private[kafka010] class KafkaRelation( driverGroupIdPrefix = s"$uniqueGroupId-driver") // Leverage the KafkaReader to obtain the relevant partition offsets - val (fromPartitionOffsets, untilPartitionOffsets) = { - try { - (getPartitionOffsets(kafkaOffsetReader, startingOffsets), - getPartitionOffsets(kafkaOffsetReader, endingOffsets)) - } finally { - kafkaOffsetReader.close() - } - } - - // Obtain topicPartitions in both from and until partition offset, ignoring - // topic partitions that were added and/or deleted between the two above calls. - if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) { - implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic()) - val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",") - val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",") - throw new IllegalStateException("different topic partitions " + - s"for starting offsets topics[${fromTopics}] and " + - s"ending offsets topics[${untilTopics}]") - } - - // Calculate offset ranges - val offsetRanges = untilPartitionOffsets.keySet.map { tp => - val fromOffset = fromPartitionOffsets.getOrElse(tp, - // This should not happen since topicPartitions contains all partitions not in - // fromPartitionOffsets - throw new IllegalStateException(s"$tp doesn't have a from offset")) - val untilOffset = untilPartitionOffsets(tp) - KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, None) - }.toArray + val offsetRanges = new PartitionOffsetCalculator(filters, kafkaOffsetReader) + .calculateOffsets() + .checkStartEndHavingSamePartitions() + .calculateRDDOffsetRanges(filters) logInfo("GetBatch generating RDD of offset range: " + offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) @@ -103,47 +81,261 @@ private[kafka010] class KafkaRelation( // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. val executorKafkaParams = KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId) + val requiredColumnsList = requiredColumns.toList val rdd = new KafkaSourceRDD( sqlContext.sparkContext, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr => - InternalRow( - cr.key, - cr.value, - UTF8String.fromString(cr.topic), - cr.partition, - cr.offset, - DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), - cr.timestampType.id) - } - sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema).rdd + val columns = new ConsumerRecordInspector(cr).getValues(requiredColumnsList) + InternalRow.fromSeq(columns) + } + val schemaProjected = StructType(requiredColumns.map{schema(_)}) + sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schemaProjected).rdd + } + + override def toString: String = + s"KafkaRelation(strategy=$strategy, start=$startingOffsets, end=$endingOffsets)" + + case class PartitionOffsetsRange(start: Map[TopicPartition, Long], + end: Map[TopicPartition, Long]) { + + def checkStartEndHavingSamePartitions(): PartitionOffsetsRange = { + // Obtain topicPartitions in both from and until partition offset, ignoring + // topic partitions that were added and/or deleted between the two above calls. + if (start.keySet != end.keySet) { + implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic()) + val fromTopics = start.keySet.toList.sorted.mkString(",") + val untilTopics = end.keySet.toList.sorted.mkString(",") + throw new IllegalStateException("different topic partitions " + + s"for starting offsets topics[${fromTopics}] and " + + s"ending offsets topics[${untilTopics}]") + } + this + } + + def calculateRDDOffsetRanges(filters: Array[Filter]): + Array[KafkaSourceRDDOffsetRange] = { + end.keySet.map { tp => + val fromOffset = start.getOrElse(tp, + // This should not happen since topicPartitions contains all partitions not in + // fromPartitionOffsets + throw new IllegalStateException(s"$tp doesn't have a from offset")) + var untilOffset = end(tp) + untilOffset = if (areOffsetsInLine(fromOffset, untilOffset)) { + untilOffset + } else { + logWarning(s"Predicate(s) out of offset range for partition: $tp. " + + s"Range start: $startingOffsets, range end: $endingOffsets. " + + s"Predicates: ${filters.mkString(",")}") + fromOffset + } + KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, None) + }.toArray + } + + private def areOffsetsInLine(fromOffset: Long, untilOffset: Long): Boolean = { + untilOffset >= fromOffset || untilOffset < 0 || fromOffset < 0 + } } - private def getPartitionOffsets( - kafkaReader: KafkaOffsetReader, - kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long] = { - def validateTopicPartitions(partitions: Set[TopicPartition], - partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { - assert(partitions == partitionOffsets.keySet, - "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest, if you don't care.\n" + - s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") - partitionOffsets - } - val partitions = kafkaReader.fetchTopicPartitions() - // Obtain TopicPartition offsets with late binding support - kafkaOffsets match { - case EarliestOffsetRangeLimit => partitions.map { - case tp => tp -> KafkaOffsetRangeLimit.EARLIEST - }.toMap - case LatestOffsetRangeLimit => partitions.map { - case tp => tp -> KafkaOffsetRangeLimit.LATEST - }.toMap - case SpecificOffsetRangeLimit(partitionOffsets) => - validateTopicPartitions(partitions, partitionOffsets) + case class PartitionOffsetRangeOpts(start: Map[TopicPartition, Option[Long]], + end: Map[TopicPartition, Option[Long]]) { + + /** + * When we request timestamp -> KafkaOffset mapping, + * Kafka may return null for some partitions. + * This means that specific partition doesn't contain any record + * which timestamp is equal or greater to the given timestamp. + * If any of the start or the end offset is None, then user provided + * a predicate which doesn't match any record in the partition. + * We change both start and end to 0, so given partition will not serve data. + * + * @return specific partition offsets (not Option) + */ + def invalidateNoneOffsets(): PartitionOffsetsRange = { + val merged = start.map { case (k, v) => k -> ((v, end(k)))} + val invalidated = merged.map { + case(k, (None, _)) => k -> ((0L, 0L)) + case(k, (_, None)) => k -> ((0L, 0L)) + case(k, (s, e)) => k -> ((s.get, e.get)) + } + PartitionOffsetsRange( + start = invalidated.map{case(k, (s, _)) => k -> s}, + end = invalidated.map{case(k, (_, e)) => k -> e}) } } - override def toString: String = - s"KafkaRelation(strategy=$strategy, start=$startingOffsets, end=$endingOffsets)" + class PartitionOffsetCalculator( + filters: Array[Filter], + kafkaReader: KafkaOffsetReader) { + + def calculateOffsets(): PartitionOffsetsRange = { + try { + calculateStartAndEndOffsets().invalidateNoneOffsets() + } finally { + kafkaReader.close() + } + } + + private def calculateStartAndEndOffsets(): PartitionOffsetRangeOpts = { + PartitionOffsetRangeOpts( + start = getStartingPartitionOffsets(filters), + end = getEndingPartitionOffsets(filters) + ) + } + + private def getEndingPartitionOffsets( + filters: Array[Filter]): Map[TopicPartition, Option[Long]] = { + + val offsetsByDSOpts = getOffsetsByDataSourceOpts(endingOffsets) + mergeWithEndingOffsetsByFilter(offsetsByDSOpts) + } + + private def getStartingPartitionOffsets( + filters: Array[Filter]): Map[TopicPartition, Option[Long]] = { + + val offsetsByDSOpts = getOffsetsByDataSourceOpts(startingOffsets) + mergeWithStartingOffsetsByFilter(offsetsByDSOpts) + } + + /** + * Old versions of Kafka that doesn't support timestamp APIs will + * log error message but continue to work without pushdown + */ + private def mergeWithEndingOffsetsByFilter(offsetsByDSOpts: Map[TopicPartition, Long]) = { + try { + mergeWithEndingOffsetsByFilterUnsafe(offsetsByDSOpts) + } catch { case e: Exception => + handleTimestampFetchError(offsetsByDSOpts, e) + } + } + + /** + * Old versions of Kafka that doesn't support timestamp APIs will + * log error message but continue to work without pushdown + */ + private def mergeWithStartingOffsetsByFilter(offsetsByDSOpts: Map[TopicPartition, Long]) = { + try { + mergeWithStartingOffsetsByFilterUnsafe(offsetsByDSOpts) + } catch { case e: Exception => + handleTimestampFetchError(offsetsByDSOpts, e) + } + } + + private def handleTimestampFetchError( + offsetsByDSOpts: Map[TopicPartition, Long], e: Exception) = { + logError( + "Couldn't fetch offsetForTimestamps from Kafka." + + "Timestamp will not be pushed-down to kafka.", e) + offsetsByDSOpts.mapValues(Option(_)) + } + + private def getOffsetsByDataSourceOpts( + kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long] = { + def validateTopicPartitions(partitions: Set[TopicPartition], + partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { + assert(partitions == partitionOffsets.keySet, + "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + + "Use -1 for latest, -2 for earliest, if you don't care.\n" + + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + partitionOffsets + } + val partitions = kafkaReader.fetchTopicPartitions() + // Obtain TopicPartition offsets with late binding support + kafkaOffsets match { + case EarliestOffsetRangeLimit => partitions.map { + case tp => tp -> KafkaOffsetRangeLimit.EARLIEST + }.toMap + case LatestOffsetRangeLimit => partitions.map { + case tp => tp -> KafkaOffsetRangeLimit.LATEST + }.toMap + case SpecificOffsetRangeLimit(partitionOffsets) => + validateTopicPartitions(partitions, partitionOffsets) + } + } + + private val TIMESTAMP_ATTR = "timestamp" + + private def mergeWithStartingOffsetsByFilterUnsafe( + toMerge : Map[TopicPartition, Long]): Map[TopicPartition, Option[Long]] = { + // Generates array of Tuples (topicPartition -> Option(filterOffset)) for each filter + val partitionWithOffsetArray: Array[(TopicPartition, Option[Long])] = filters.flatMap { + case op: GreaterThan if op.attribute == TIMESTAMP_ATTR => + val times = toMerge.map { case (tp, _) => + tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)} + kafkaReader.fetchOffsetsByTime(times) + case op: EqualTo if op.attribute == TIMESTAMP_ATTR => + val times = toMerge.map { case (tp, _) => + tp -> op.value.asInstanceOf[Timestamp].getTime} + kafkaReader.fetchOffsetsByTime(times) + case op: GreaterThanOrEqual if op.attribute == TIMESTAMP_ATTR => + val times = toMerge.map { case (tp, _) => + tp -> op.value.asInstanceOf[Timestamp].getTime} + kafkaReader.fetchOffsetsByTime(times) + case _ => None + } + // Let's take the maximum of all filterOffsets per partition + // If any partition have None assigned we need to return None + val filterOffsets: Map[TopicPartition, Option[Long]] = + partitionWithOffsetArray.groupBy(_._1) + .mapValues(_.map{_._2}.maxBy(_.getOrElse(Long.MaxValue))) + // Take maximum of filterOffsets and offsets from argument per partition + toMerge.map {case (tp, offset) => + val filterOffset: Option[Long] = filterOffsets.getOrElse(tp, Some(offset)) + tp -> filterOffset.flatMap{o => Some(Math.max(offset, o))} + } + } + + private def mergeWithEndingOffsetsByFilterUnsafe( + toMerge : Map[TopicPartition, Long]): Map[TopicPartition, Option[Long]] = { + // Generates array of Tuples (topicPartition -> Option(filterOffset)) for each filter + val partitionWithOffsetArray = filters.flatMap { + case op: LessThan if op.attribute == TIMESTAMP_ATTR => + val times = toMerge.map { case (tp, _) => + tp -> op.value.asInstanceOf[Timestamp].getTime} + kafkaReader.fetchOffsetsByTime(times) + case op: LessThanOrEqual if op.attribute == TIMESTAMP_ATTR => + val times = toMerge.map { case (tp, _) => + tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)} + kafkaReader.fetchOffsetsByTime(times) + case op: EqualTo if op.attribute == TIMESTAMP_ATTR => + val times = toMerge.map { case (tp, _) => + tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)} + kafkaReader.fetchOffsetsByTime(times) + case _ => None + } + // Let's take the minimum of all filterOffsets per partition + // Natural ordering of "min" will return None if any partition have None assigned + val filterOffsets: Map[TopicPartition, Option[Long]] = + partitionWithOffsetArray.groupBy(_._1).mapValues(_.map{_._2}.min) + // Take minimum of filterOffsets and offsets from argument per partition + toMerge.map {case (tp, offset) => + var newOffset: Option[Long] = filterOffsets.getOrElse(tp, Some(offset)) + if (isNotLatestOrEarliest(offset)) { + newOffset = newOffset.flatMap{o => Option(Math.min(offset, o))} + } + tp -> newOffset + } + } + + private def isNotLatestOrEarliest(offset: Long): Boolean = { + offset >= 0 + } + } +} + +class ConsumerRecordInspector(cr: ConsumerRecord[Array[Byte], Array[Byte]]) { + def getValues(requiredColumns: List[String]) : Seq[Any] = { + requiredColumns match { + case "key"::rest => cr.key +: getValues(rest) + case "value"::rest => cr.value +: getValues(rest) + case "topic"::rest => UTF8String.fromString(cr.topic) +: getValues(rest) + case "partition"::rest => cr.partition +: getValues(rest) + case "offset"::rest => cr.offset +: getValues(rest) + case "timestamp"::rest => DateTimeUtils. + fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)) +: getValues(rest) + case "timestampType"::rest => cr.timestampType.id +: getValues(rest) + case Seq() => Seq.empty + } + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 2cd13a994ee8..d57dd9708411 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -44,6 +44,17 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p))) } + private def buildOffsetRestriction(topic: String, offsetMap: Map[Int, Long]) = { + val startPartitionOffsets = offsetMap.map{case(k, v) => new TopicPartition(topic, k) -> v } + JsonUtils.partitionOffsets(startPartitionOffsets) + } + + private def buildDDLOptions(topic: String, start: Map[Int, Long], end: Map[Int, Long]) = { + val startingOffsets: String = buildOffsetRestriction(topic, start) + val endingOffsets: String = buildOffsetRestriction(topic, end) + Map("startingOffsets" -> startingOffsets, "endingOffsets" -> endingOffsets) + } + override def beforeAll(): Unit = { super.beforeAll() testUtils = new KafkaTestUtils @@ -51,12 +62,9 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest } override def afterAll(): Unit = { - try { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - } - } finally { + if (testUtils != null) { + testUtils.teardown() + testUtils = null super.afterAll() } } @@ -65,6 +73,15 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest topic: String, withOptions: Map[String, String] = Map.empty[String, String], brokerAddress: Option[String] = None) = { + createKafkaDF(topic, withOptions, brokerAddress) + .selectExpr("CAST(value AS STRING)") + } + + + private def createKafkaDF( + topic: String, + withOptions: Map[String, String] = Map.empty[String, String], + brokerAddress: Option[String] = None) = { val df = spark .read .format("kafka") @@ -74,10 +91,9 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest withOptions.foreach { case (key, value) => df.option(key, value) } - df.load().selectExpr("CAST(value AS STRING)") + df.load() } - test("explicit earliest to latest offsets") { val topic = newTopic() testUtils.createTopic(topic, partitions = 3) @@ -130,7 +146,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest ) val endingOffsets = JsonUtils.partitionOffsets(endPartitionOffsets) val df = createDF(topic, - withOptions = Map("startingOffsets" -> startingOffsets, "endingOffsets" -> endingOffsets)) + withOptions = Map("startingOffsets" -> startingOffsets, "endingOffsets" -> endingOffsets)) checkAnswer(df, (0 to 20).map(_.toString).toDF) // static offset partition 2, nothing should change @@ -161,9 +177,9 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest var kafkaUtils: KafkaTestUtils = null try { /** - * The following settings will ensure that all log entries - * are removed following a call to cleanupLogs - */ + * The following settings will ensure that all log entries + * are removed following a call to cleanupLogs + */ val brokerProps = Map[String, Object]( "log.retention.bytes" -> 1.asInstanceOf[AnyRef], // retain nothing "log.retention.ms" -> 1.asInstanceOf[AnyRef] // no wait time @@ -242,6 +258,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") } + test("allow group.id overriding") { // Tests code path KafkaSourceProvider.createRelation(.) val topic = newTopic() @@ -360,4 +377,316 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest checkAnswer(df, (1 to 15).map(_.toString).toDF) } } + + test("specific columns projection") { + val topic = newTopic() + testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0)) + val df = createKafkaDF(topic) + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic") + checkAnswer(df, (0 to 9).map(x => (null, x.toString, topic)).toDF("key", "value", "topic")) + } + + test("timestamp pushdown greaterThan") { + val topic = newTopic() + testUtils.createTopic(topic, 4) + + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 24, 3, 5001001) + testUtils.sendMessagesOverPartitions(topic, 25 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 100000000) + // Lets leave partition 4 with no data > 5001 + testUtils.sendMessages(topic, (100 to 110).map(_.toString).toArray, Some(3), Some(4000000)) + + val df = createDF(topic).where("timestamp > cast(5001 as TIMESTAMP)") + checkAnswer(df, (20 to 39).map(_.toString).toDF()) + } + + test("timestamp pushdown greaterThanEquals") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 100000000) + + val df = createDF(topic).where("timestamp >= cast(5001 as TIMESTAMP)") + checkAnswer(df, (10 to 39).map(_.toString).toDF()) + } + + test("timestamp pushdown lessThan") { + val topic = newTopic() + testUtils.createTopic(topic, 4) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 14, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 15 to 19, 3, 5001999) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 100000000) + // Lets leave partition 4 with no data < 5002000 + testUtils.sendMessages(topic, (100 to 110).map(_.toString).toArray, Some(3), Some(8000000)) + + val df = createDF(topic).where("timestamp < cast(5002 as TIMESTAMP)") + checkAnswer(df, (0 to 19).map(_.toString).toDF()) + } + + test("timestamp pushdown lessThanEquals") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 100000000) + val df = createDF(topic).where("timestamp <= cast(5002 as TIMESTAMP)") + checkAnswer(df, (0 to 29).map(_.toString).toDF()) + } + + test("timestamp pushdown lessThan and greaterThan") { + val topic = newTopic() + testUtils.createTopic(topic, 4) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 40 to 49, 3, 100000000) + + // Lets leave partition 4 with missing data for the query + testUtils.sendMessages(topic, (110 to 119).map(_.toString).toArray, Some(3), Some(4000000)) + val df = createDF(topic).where( + "timestamp > cast(5000 as TIMESTAMP) and timestamp < cast(5003 as TIMESTAMP)") + checkAnswer(df, (10 to 29).map(_.toString).toDF()) + } + + test("timestamp pushdown multiple conditions") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 40 to 49, 3, 100000000) + val df = createDF(topic).where( + """timestamp > cast(5000 as TIMESTAMP) and + |timestamp > cast(4000 as TIMESTAMP) and + |timestamp < cast(5003 as TIMESTAMP) and + |timestamp < cast(5002 as TIMESTAMP) and + |timestamp < cast(8000 as TIMESTAMP) and""".stripMargin) + checkAnswer(df, (10 to 19).map(_.toString).toDF()) + } + + test("timestamp pushdown multiple conditions with simple or predicate") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 40 to 49, 3, 100000000) + val df = createDF(topic).where( + "timestamp < cast(5002 as TIMESTAMP) or timestamp > cast(5003 as TIMESTAMP)") + checkAnswer(df, ((0 to 19) ++ (40 to 49)).map(_.toString).toDF()) + } + + test("timestamp pushdown multiple conditions with complex or predicate") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 40 to 49, 3, 100000000) + val df = createDF(topic).where( + """(timestamp > cast(5000 as TIMESTAMP) and + |timestamp < cast(5002 as TIMESTAMP)) or + |(timestamp > cast(5002 as TIMESTAMP) and + |timestamp < cast(5004 as TIMESTAMP))""".stripMargin) + checkAnswer(df, ((10 to 19) ++ (30 to 39)).map(_.toString).toDF()) + } + + test("timestamp pushdown with contradictory condition") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 40 to 49, 3, 100000000) + val df = createDF(topic).where( + """timestamp < cast(5002 as TIMESTAMP) and + |timestamp > cast(5002 as TIMESTAMP)""".stripMargin) + checkAnswer(df, spark.emptyDataFrame) + } + + test("timestamp pushdown equals") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 9, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 10 to 19, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 20 to 29, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 30 to 39, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 40 to 49, 3, 100000000) + val df = createDF(topic).where("timestamp = cast(5002 as TIMESTAMP)") + checkAnswer(df, (20 to 29).map(_.toString).toDF()) + } + + test("timestamp pushdown on unevenly distributed partitions") { + val topic = newTopic() + testUtils.createTopic(topic, 4) + testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0), Some(5000000)) + testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(1), Some(5005000)) + testUtils.sendMessages(topic, (20 to 24).map(_.toString).toArray, Some(2), Some(5002000)) + testUtils.sendMessages(topic, (30 to 34).map(_.toString).toArray, Some(3), Some(5003000)) + testUtils.sendMessages(topic, (35 to 39).map(_.toString).toArray, Some(3), Some(5004000)) + testUtils.sendMessages(topic, (40 to 49).map(_.toString).toArray, Some(0), Some(100000000)) + // Equals operator doesn't find the last element of the partition => we need to add 1 msg on top + testUtils.sendMessages(topic, (25 to 25).map(_.toString).toArray, Some(2), Some(5003000)) + + val df = createDF(topic).where("timestamp = cast(5002 as TIMESTAMP)") + checkAnswer(df, (20 to 24).map(_.toString).toDF()) + } + + test("timestamp pushdown with specific lower bound DDL offset limit") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + + testUtils.sendMessagesOverPartitions(topic, 0 to 14, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 15 to 29, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 30 to 44, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 45 to 59, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 60 to 74, 3, 100000000) + + val options = buildDDLOptions(topic, + Map(0 -> 15L, 1 -> 15L, 2 -> 15L), + Map(0 -> -1L, 1 -> -1L, 2 -> -1L) // latest + ) + val df = createDF(topic, withOptions = options) + .where("timestamp > cast(5000 as TIMESTAMP) and timestamp < cast(5004 as TIMESTAMP)") + // 45 (15 offset * 3 partitions) to 59 (constrained by timestamp < 5004000) + checkAnswer(df, (45 to 59).map(_.toString).toDF()) + } + + + test("timestamp pushdown on latest range") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + + testUtils.sendMessagesOverPartitions(topic, 0 to 14, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 15 to 29, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 30 to 44, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 45 to 59, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 60 to 74, 3, 100000000) + testUtils.sendMessagesOverPartitions(topic, 75 to 200, 3, 5005000) + + val options = buildDDLOptions(topic, + Map(0 -> 10L, 1 -> 10L, 2 -> 10L), + Map(0 -> -1L, 1 -> -1L, 2 -> -1L) // latest + ) + val df = createDF(topic, withOptions = options) + .where("timestamp > cast(5002 as TIMESTAMP)") + // 45 (by timestamp > 5002000) to 150 unbounded + checkAnswer(df, (45 to 200).map(_.toString).toDF()) + } + + + test("timestamp pushdown with specific upper bound DDL offset limit") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 4) + + val options = buildDDLOptions(topic, + Map(0 -> -2L, 1 -> -2L, 2 -> -2L, 3 -> -2L), // earliest + Map(0 -> 22L, 1 -> 22L, 2 -> 22L, 3 -> 22L) + ) + val df = createDF(topic, withOptions = options) + .where("timestamp > cast(5001 as TIMESTAMP)") + testUtils.sendMessagesOverPartitions(topic, 0 to 19, 4, 5000000) + testUtils.sendMessagesOverPartitions(topic, 20 to 39, 4, 5001000) + testUtils.sendMessagesOverPartitions(topic, 40 to 59, 4, 5002000) + testUtils.sendMessagesOverPartitions(topic, 60 to 79, 4, 5003000) + testUtils.sendMessagesOverPartitions(topic, 80 to 99, 4, 100000000) + // from 40 (by timestamp > 5001000) + // to 79 (20 offset * 4 partitions) + 2 remaining offset from each partition + checkAnswer(df, ((40 to 79) ++ (80 to 81) ++ (85 to 86) ++ (90 to 91) ++ (95 to 96)) + .map(_.toString).toDF()) + } + + test("timestamp pushdown on earliest range") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 4) + + val options = buildDDLOptions(topic, + Map(0 -> -2L, 1 -> -2L, 2 -> -2L, 3 -> -2L), // earliest + Map(0 -> 22L, 1 -> 22L, 2 -> 22L, 3 -> 22L) + ) + val df = createDF(topic, withOptions = options) + .where("timestamp < cast(5003 as TIMESTAMP)") + testUtils.sendMessagesOverPartitions(topic, 0 to 19, 4, 5000000) + testUtils.sendMessagesOverPartitions(topic, 20 to 39, 4, 5001000) + testUtils.sendMessagesOverPartitions(topic, 40 to 59, 4, 5002000) + testUtils.sendMessagesOverPartitions(topic, 60 to 79, 4, 5003000) + testUtils.sendMessagesOverPartitions(topic, 80 to 99, 4, 100000000) + // earliest to 59 (by timestamp < 5003000) + checkAnswer(df, (0 to 59).map(_.toString).toDF()) + } + + test("timestamp pushdown with specific offsets upper & lower bound") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 4) + val options = buildDDLOptions(topic, + Map(0 -> 7, 1 -> 7, 2 -> 7, 3 -> 7), + Map(0 -> 18L, 1 -> 18L, 2 -> 18L, 3 -> 18L) + ) + val df = createDF(topic, withOptions = options) + .where("timestamp > cast(5000 as TIMESTAMP) and timestamp < cast(5003 as TIMESTAMP)") + testUtils.sendMessagesOverPartitions(topic, 0 to 19, 4, 5000000) + testUtils.sendMessagesOverPartitions(topic, 20 to 39, 4, 5001000) + testUtils.sendMessagesOverPartitions(topic, 40 to 59, 4, 5002000) + testUtils.sendMessagesOverPartitions(topic, 60 to 79, 4, 5003000) + testUtils.sendMessagesOverPartitions(topic, 80 to 99, 4, 100000000) + + // start with 3 from each partition (by offset) to 59 (by timestamp < 5003000) + checkAnswer(df, ((22 to 24) ++ (27 to 29) ++ (32 to 34) ++ (37 to 39) ++ (40 to 59)) + .map(_.toString).toDF()) + } + + test("timestamp pushdown out of offset range") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 4) + val options = buildDDLOptions(topic, + Map(0 -> 7, 1 -> 7, 2 -> 7, 3 -> 7), + Map(0 -> 18L, 1 -> 18L, 2 -> 18L, 3 -> 18L) + ) + val df = createDF(topic, withOptions = options) + .where("timestamp > cast(5003 as TIMESTAMP) and timestamp < cast(5005 as TIMESTAMP)") + testUtils.sendMessagesOverPartitions(topic, 0 to 19, 4, 5000000) + testUtils.sendMessagesOverPartitions(topic, 20 to 39, 4, 5001000) + testUtils.sendMessagesOverPartitions(topic, 40 to 59, 4, 5002000) + testUtils.sendMessagesOverPartitions(topic, 60 to 79, 4, 5003000) + testUtils.sendMessagesOverPartitions(topic, 80 to 99, 4, 100000000) + + checkAnswer(df, spark.emptyDataFrame) + } + + test("where query on partition condition") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 8, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 9 to 17, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 18 to 26, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 27 to 35, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 36 to 44, 3, 100000000) + val df = createDF(topic).where("timestamp > cast(5001 as TIMESTAMP) and partition = 0") + // first 3 elements from each batch + checkAnswer(df, ((18 to 20) ++ (27 to 29) ++ (36 to 38)).map(_.toString).toDF()) + } + + test("where query on offset condition") { + val topic = newTopic() + testUtils.createTopic(topic, 3) + testUtils.sendMessagesOverPartitions(topic, 0 to 8, 3, 5000000) + testUtils.sendMessagesOverPartitions(topic, 9 to 17, 3, 5001000) + testUtils.sendMessagesOverPartitions(topic, 18 to 26, 3, 5002000) + testUtils.sendMessagesOverPartitions(topic, 27 to 35, 3, 5003000) + testUtils.sendMessagesOverPartitions(topic, 36 to 44, 3, 100000000) + val df = createDF(topic).where("offset = 7") + checkAnswer(df, ((19 to 19) ++ (22 to 22) ++ (25 to 25)).map(_.toString).toDF()) + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 70b6e67f80d5..a38390dae314 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -237,6 +237,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L } } + def sendMessagesOverPartitions(topic: String, msgs: Range, + partitionCount: Int, timestamp: Int) : Unit = { + val elementsCount = math.ceil(msgs.size.asInstanceOf[Double] / partitionCount).toInt + val msgsPartitioned = msgs.grouped(elementsCount).toList + msgsPartitioned.zipWithIndex.foreach{ case(msgPerPartition, partitionIndex) => + sendMessages(topic, msgPerPartition.map(_.toString).toArray, + Some(partitionIndex), Some(timestamp)) + } + } + /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) @@ -257,17 +267,22 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L def sendMessages( topic: String, messages: Array[String], - partition: Option[Int]): Seq[(String, RecordMetadata)] = { + partition: Option[Int], + timestamp: Option[Long] = None): Seq[(String, RecordMetadata)] = { producer = new KafkaProducer[String, String](producerConfiguration) val offsets = try { + val timestampJava: java.lang.Long = timestamp match { + case Some(ts) => ts + case _ => null + } messages.map { m => val record = partition match { - case Some(p) => new ProducerRecord[String, String](topic, p, null, m) - case None => new ProducerRecord[String, String](topic, m) + case Some(p) => new ProducerRecord[String, String](topic, p, timestampJava, null, m) + case None => new ProducerRecord[String, String](topic, null, timestampJava, null, m) } val metadata = producer.send(record).get(10, TimeUnit.SECONDS) - logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}") + logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}") (m, metadata) } } finally {