Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ private[kafka010] class KafkaOffsetReader(
partitions.asScala.toSet
}

def fetchOffsetsByTime(times: Map[TopicPartition, Long]):
Map[TopicPartition, Long] = runUninterruptibly {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])

consumer.offsetsForTimes(times.map{case (k, v) => k -> long2Long(v)}.asJava)
.asScala.map{case (k, v) =>
k -> (if (v != null) Long2long(v.offset()) else KafkaOffsetReader.EMPTY_OFFSET)
}.toMap
}

/**
* Resolves the specific offsets based on Kafka seek positions.
* This method resolves offset value -1 to the latest and -2 to the
Expand Down Expand Up @@ -396,6 +406,8 @@ private[kafka010] class KafkaOffsetReader(
}

private[kafka010] object KafkaOffsetReader {
// offsets are not instances of Optional, we need special state for None
val EMPTY_OFFSET: Long = -100L

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow why this is needed. Normally, if fetchOffsetsByTime sometimes needs to return offsets and sometimes needs to return None, it should just return Option[Offset] values.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree with you (this is what the comment is trying to express). I tried to go with least invasive changes, but since you've pointed this out, I can change the offset map everywhere from:
Map[TopicPartition, Long] to Map[TopicPartition, Option[Long]]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be an Option[Long] everywhere. Just until we decide what value we want to pass to Kafka.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Option[Long] where it was necessary


def kafkaSchema: StructType = StructType(Seq(
StructField("key", BinaryType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package org.apache.spark.sql.kafka010

import java.sql.Timestamp
import java.util.UUID

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.kafka010.KafkaOffsetReader.EMPTY_OFFSET
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String


Expand All @@ -39,7 +42,7 @@ private[kafka010] class KafkaRelation(
failOnDataLoss: Boolean,
startingOffsets: KafkaOffsetRangeLimit,
endingOffsets: KafkaOffsetRangeLimit)
extends BaseRelation with TableScan with Logging {
extends BaseRelation with PrunedFilteredScan with Logging {
assert(startingOffsets != LatestOffsetRangeLimit,
"Starting offset not allowed to be set to latest offsets.")
assert(endingOffsets != EarliestOffsetRangeLimit,
Expand All @@ -54,7 +57,7 @@ private[kafka010] class KafkaRelation(

override def schema: StructType = KafkaOffsetReader.kafkaSchema

override def buildScan(): RDD[Row] = {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
Expand All @@ -69,8 +72,9 @@ private[kafka010] class KafkaRelation(
// Leverage the KafkaReader to obtain the relevant partition offsets
val (fromPartitionOffsets, untilPartitionOffsets) = {
try {
(getPartitionOffsets(kafkaOffsetReader, startingOffsets),
getPartitionOffsets(kafkaOffsetReader, endingOffsets))
val start = getStartingPartitionOffsets(kafkaOffsetReader, filters)
val end = getEndingPartitionOffsets(kafkaOffsetReader, filters)
invalidateEmptyOffsets(start, end)
} finally {
kafkaOffsetReader.close()
}
Expand All @@ -90,10 +94,12 @@ private[kafka010] class KafkaRelation(
// Calculate offset ranges
val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
val fromOffset = fromPartitionOffsets.getOrElse(tp,
// This should not happen since topicPartitions contains all partitions not in
// fromPartitionOffsets
throw new IllegalStateException(s"$tp doesn't have a from offset"))
val untilOffset = untilPartitionOffsets(tp)
// This should not happen since topicPartitions contains all partitions not in
// fromPartitionOffsets
throw new IllegalStateException(s"$tp doesn't have a from offset")
}
var untilOffset = untilPartitionOffsets(tp)
untilOffset = if (areOffsetsInLine(fromOffset, untilOffset)) untilOffset else fromOffset

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem safe. We should avoid generating nonsensical ranges in the first place, rather than generating them and then silently clamping them down.

@tomasbartalos tomasbartalos Mar 18, 2019

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nonsensical ranges originate from wrong (contradictory) user queries (example: timestamp > 10 and timestamp < 10). Question is how you want to react to this kind of queries.

  1. If we don't handle them then user will see an error:
    You either provided an invalid fromOffset, or the Kafka topic has been damaged
  2. If we do handle them, user will get empty result set and no error.

I'm more fan of option 2), since this is how most of DB would react, but if you disagree I can delete the handling.
Maybe method naming areOffsetsInLine could be improved ?

If I uncomment the line
untilOffset = if (areOffsetsInLine(fromOffset, untilOffset)) untilOffset else fromOffset
Then 2 unit tests fails:

  • timestamp pushdown with contradictory condition - query like timestamp > 10 and timestamp < 10
  • timestamp pushdown out of offset range - this is for cases where DS option specifies offset range and the timestamp filter is valid but out of DS offset range.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose option 2 really is the only good choice here. But let's add a warning log for this case, saying what the original range was and what user predicates made us clamp it to empty set.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Warning log message. This message will be printed for each partition that was clamped to empty set

KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, None)
}.toArray

Expand All @@ -106,19 +112,49 @@ private[kafka010] class KafkaRelation(
val rdd = new KafkaSourceRDD(
sqlContext.sparkContext, executorKafkaParams, offsetRanges,
pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
InternalRow(
cr.key,
cr.value,
UTF8String.fromString(cr.topic),
cr.partition,
cr.offset,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
cr.timestampType.id)
val columns = requiredColumns.map{KafkaRelation.columnToValueExtractor(_)(cr)}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on the fast path. I don't think we should add 2 lambda invocations and a HashMap lookup to every row computation. I'd prefer just having the test harness create its own extractor if it needs one.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean by

I'd prefer just having the test harness create its own extractor if it needs one

Could you please elaborate more on that ?

The relation type was changed from TableScan to PrunedFilteredScan and so I need to return only required columns (not all columns as before), thus extracting only specific fields from ConsumerRecord.

I was able to come up with alternative solution:

  class ConsumerRecordInspector(cr: ConsumerRecord[Array[Byte], Array[Byte]]) {
    def getValues(requiredColumns: List[Any]) : Seq[Any] = {
      requiredColumns match {
        case "key"::rest => cr.key +: getValues(rest)
        case "value"::rest => cr.value +: getValues(rest)
        case "topic"::rest => UTF8String.fromString(cr.topic) +: getValues(rest)
        case "partition"::rest => cr.partition +: getValues(rest)
        case "offset"::rest => cr.offset +: getValues(rest)
        case "timestamp"::rest => DateTimeUtils.
          fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)) +: getValues(rest)
        case "timestampType"::rest => cr.timestampType.id +: getValues(rest)
        case Seq() => Seq.empty
      }
    }
  }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lambda invocation replaced with recursion.
Single-core (i7) performance test on my laptop for 1 billion records:

  • recursion = 2m 38s
  • lambdas = 3m 47s

InternalRow.fromSeq(columns)
}
val schemaProjected = StructType(requiredColumns.map{schema(_)})
sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schemaProjected).rdd
}

def invalidateEmptyOffsets(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method needs docs - I don't really understand what it's doing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I'll elaborate why we need empty offset. When we request mapping of timestamp to Kafka offset, Kafka may return null for some partitions. This means that specific partition doesn't contain any record which timestamp is equal or greater to the given timestamp. I need to handle this situation and transform null to something meaningful (currently constant, will be changed to None as you've proposed).

The above situation may happen for calculated startingOffsets or endingOffsets. As a result we have to invalidate every partition which have startOffsets or endOffsets empty - set their offset range to (0,0).

Should I just add a comment to the method ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs

startOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]):
(Map[TopicPartition, Long], Map[TopicPartition, Long]) = {

val merged = startOffset.map { case (k, v) => k -> ((v, endOffset(k)))}
val invalidated = merged.map {
case(k, (start, end)) if start != EMPTY_OFFSET && end != EMPTY_OFFSET =>
k -> ((start, end))
case(k, _) => k -> ((0L, 0L))
}
sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema).rdd
(invalidated.map{case(k, (start, _)) =>
k -> start}, invalidated.map{case(k, (_, end)) => k -> end})
}

private def areOffsetsInLine(fromOffset: Long, untilOffset: Long): Boolean = {
untilOffset > fromOffset || untilOffset < 0 || fromOffset < 0
}

private def getEndingPartitionOffsets(
kafkaReader: KafkaOffsetReader,
filters: Array[Filter]): Map[TopicPartition, Long] = {

val offsetsByLimit = getPartitionOffsetsByRangeLimit(kafkaReader, endingOffsets)
getEndingPartitionOffsetsByFilter(kafkaReader, offsetsByLimit, filters)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the naming quite makes sense to me here. It's weird to have to apply multiple levels of "getPartitionOffsets" before obtaining the actual partition offsets that should be used.

@tomasbartalos tomasbartalos Mar 18, 2019

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the method naming getPartitionOffsetsByRangeLimit is confusing.

To Kafka, we can pushdown only offsets, so what we need to do is to merge offsets specified as DS option (startingOffsets, endingOffsets) with offsets obtained from timestamp filter.

Example: DS option have offsets range of 100 - 200
startingOffsets '{"topic" : {"0" : 100}}', endingOffsets '{"topic" : {"0" : 200}}');
but timestamp pushdown where timestamp > x and timestamp < y have offset range of let's say 150 - 300

The merge result is highest of starting to lowest of ending = 150 - 200

I think getEndingPartitionOffsetsByFilter does what is says, but it's worth to rename the getPartitionOffsetsByRangeLimit. What do you think is a good name ?
Maybe: getPartitionOffsetsFromDSOption ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I see "getPartitionOffsetsByA" and "getPartitionOffsetsByB", I normally expect those to be two independent strategies A and B for getting the final offsets. If they have to be chained together in some specific order, I don't think these are appropriate method names; they should be renamed or inlined.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code dealing with offset calculation was extracted to separate class. Thus most of the methods could become single argument ones and the dependency between calculation strategies could be emphasised.

}

private def getStartingPartitionOffsets(
kafkaReader: KafkaOffsetReader,
filters: Array[Filter]): Map[TopicPartition, Long] = {

val offsetsByLimit = getPartitionOffsetsByRangeLimit(kafkaReader, startingOffsets)
getStartingPartitionOffsetsByFilter(kafkaReader, offsetsByLimit, filters)
}

private def getPartitionOffsets(
private def getPartitionOffsetsByRangeLimit(
kafkaReader: KafkaOffsetReader,
kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long] = {
def validateTopicPartitions(partitions: Set[TopicPartition],
Expand All @@ -144,6 +180,81 @@ private[kafka010] class KafkaRelation(
}
}

private val TIMESTAMP_ATTR = "timestamp"

private def getStartingPartitionOffsetsByFilter(
kafkaReader: KafkaOffsetReader,
limitOffsets: Map[TopicPartition, Long],
filters: Array[Filter]): Map[TopicPartition, Long] = {

val timeOffsets: Map[TopicPartition, Long] = filters.flatMap {
case op: GreaterThan if op.attribute == TIMESTAMP_ATTR =>
val times = limitOffsets.map { case (tp, _) =>
tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
kafkaReader.fetchOffsetsByTime(times)
case op: EqualTo if op.attribute == TIMESTAMP_ATTR =>
val times = limitOffsets.map { case (tp, _) =>
tp -> op.value.asInstanceOf[Timestamp].getTime}
kafkaReader.fetchOffsetsByTime(times)
case op: GreaterThanOrEqual if op.attribute == TIMESTAMP_ATTR =>
val times = limitOffsets.map { case (tp, _) =>
tp -> op.value.asInstanceOf[Timestamp].getTime}
kafkaReader.fetchOffsetsByTime(times)
case _ => None
}.toMap

limitOffsets.map {case (tp, offset) =>
val timeOffset = timeOffsets.getOrElse(tp, offset)
tp -> (if (timeOffset != EMPTY_OFFSET) math.max(offset, timeOffset) else EMPTY_OFFSET)
}
}

private def getEndingPartitionOffsetsByFilter(
kafkaReader: KafkaOffsetReader,
limitOffsets: Map[TopicPartition, Long],
filters: Array[Filter]): Map[TopicPartition, Long] = {

val timeOffsets: Map[TopicPartition, Long] = filters.flatMap {
case op: LessThan if op.attribute == TIMESTAMP_ATTR =>
val times = limitOffsets.map { case (tp, _) =>
tp -> op.value.asInstanceOf[Timestamp].getTime}
kafkaReader.fetchOffsetsByTime(times)
case op: LessThanOrEqual if op.attribute == TIMESTAMP_ATTR =>
val times = limitOffsets.map { case (tp, _) =>
tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
kafkaReader.fetchOffsetsByTime(times)
case op: EqualTo if op.attribute == TIMESTAMP_ATTR =>
val times = limitOffsets.map { case (tp, _) =>
tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
kafkaReader.fetchOffsetsByTime(times)
case _ => None
}.toMap

limitOffsets.map {case (tp, offset) =>
var newOffset = timeOffsets.getOrElse(tp, offset)
if (isLimitSpecified(offset)) {
newOffset = if (newOffset != EMPTY_OFFSET) Math.min(offset, newOffset) else EMPTY_OFFSET
}
tp -> newOffset
}
}

private def isLimitSpecified(offset: Long): Boolean = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the connection between the name and implementation here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it may seem confusing. The thing is Kafka offset can have special flag LATEST = -1 EARLIEST = -2. The method is checking if the offset is bound by specific range and not unbound (latest, earliest). Honestly I had this renamed like 3 times and I'm still not satisfied, but I can't find something meaningful.
Maybe isNotLatestOrEarliest would be better ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed method name

offset >= 0
}

override def toString: String =
s"KafkaRelation(strategy=$strategy, start=$startingOffsets, end=$endingOffsets)"
}

object KafkaRelation {
private val columnToValueExtractor = Map[String, ConsumerRecord[Array[Byte], Array[Byte]] => Any](
"key" -> (cr => cr.key),
"value" -> (cr => cr.value),
"topic" -> (cr => UTF8String.fromString(cr.topic)),
"partition" -> (cr => cr.partition),
"offset" -> (cr => cr.offset),
"timestamp" -> (cr => DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp))),
"timestampType" -> (cr => cr.timestampType.id)
)
}
Loading