Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[kafka010] case class CachedKafkaConsumer private(

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

private var consumer = createConsumer
var rawConsumer = createConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exposing internal var is generally not a good idea. A better approach is be to add the necessary methods (for which you need the consumer) in the class CachedKafkaConsumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and why renamed?


/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
Expand Down Expand Up @@ -224,8 +224,8 @@ private[kafka010] case class CachedKafkaConsumer private(

/** Create a new consumer and reset cached states */
private def resetConsumer(): Unit = {
consumer.close()
consumer = createConsumer
rawConsumer.close()
rawConsumer = createConsumer
resetFetchedData()
}

Expand Down Expand Up @@ -271,15 +271,15 @@ private[kafka010] case class CachedKafkaConsumer private(
}
}

private def close(): Unit = consumer.close()
private def close(): Unit = rawConsumer.close()

private def seek(offset: Long): Unit = {
logDebug(s"Seeking to $groupId $topicPartition $offset")
consumer.seek(topicPartition, offset)
rawConsumer.seek(topicPartition, offset)
}

private def poll(pollTimeoutMs: Long): Unit = {
val p = consumer.poll(pollTimeoutMs)
val p = rawConsumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
fetchedData = r.iterator
Expand All @@ -290,10 +290,10 @@ private[kafka010] case class CachedKafkaConsumer private(
* and the latest offset.
*/
private def getAvailableOffsetRange(): (Long, Long) = {
consumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = consumer.position(topicPartition)
consumer.seekToEnd(Set(topicPartition).asJava)
val latestOffset = consumer.position(topicPartition)
rawConsumer.seekToBeginning(Set(topicPartition).asJava)
val earliestOffset = rawConsumer.position(topicPartition)
rawConsumer.seekToEnd(Set(topicPartition).asJava)
val latestOffset = rawConsumer.position(topicPartition)
(earliestOffset, latestOffset)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ private[kafka010] trait KafkaOffsetReader {
*/
def close()

/**
* @return The Set of TopicPartitions for a given topic
*/
def fetchTopicPartitions(): Set[TopicPartition]

/**
* Set consumer position to specified offsets, making sure all assignments are set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docs seems wrong. Name says it should fetch offsets, but docs says it sets something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and whats the difference between earliest and starting offsets?

*/
Expand Down Expand Up @@ -108,7 +113,14 @@ private[kafka010] class KafkaOffsetReaderImpl(

def close(): Unit = consumer.close()

def fetchSpecificStartingOffsets(
override def fetchTopicPartitions(): Set[TopicPartition] = {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
consumer.assignment().asScala.toSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please also call pause like this to avoid fetching the real data when reusing the relation.

val partitions = consumer.assignment()
consumer.pause(partitions)
partitions.asScala.toSet

}

override def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] =
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
Expand All @@ -131,7 +143,7 @@ private[kafka010] class KafkaOffsetReaderImpl(
}
}

def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
override def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
Expand All @@ -144,7 +156,7 @@ private[kafka010] class KafkaOffsetReaderImpl(
partitionOffsets
}

def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
override def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
Expand All @@ -157,7 +169,7 @@ private[kafka010] class KafkaOffsetReaderImpl(
partitionOffsets
}

def fetchNewPartitionEarliestOffsets(
override def fetchNewPartitionEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = {
if (newPartitions.isEmpty) {
Map.empty[TopicPartition, Long]
Expand Down Expand Up @@ -283,6 +295,13 @@ private[kafka010] class UninterruptibleKafkaOffsetReader(kafkaOffsetReader: Kafk
kafkaReaderThread.shutdownNow()
}

override def fetchTopicPartitions(): Set[TopicPartition] = {
val future = Future {
kafkaOffsetReader.fetchTopicPartitions()
}(execContext)
ThreadUtils.awaitResult(future, Duration.Inf)
}

override def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
val future = Future {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[kafka010] class KafkaRelation(
val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
// Obtain topicPartitions in both from and until partition offset, ignoring
// topic partitions that were added and/or deleted between the two above calls.
if (fromPartitionOffsets.keySet.size != untilPartitionOffsets.keySet.size) {
if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic())
val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",")
Expand Down Expand Up @@ -97,9 +97,27 @@ private[kafka010] class KafkaRelation(
sqlContext.internalCreateDataFrame(rdd, schema).rdd
}

private def getPartitionOffsets(kafkaOffsets: KafkaOffsets) = kafkaOffsets match {
case EarliestOffsets => kafkaReader.fetchEarliestOffsets()
case LatestOffsets => kafkaReader.fetchLatestOffsets()
case SpecificOffsets(p) => kafkaReader.fetchSpecificStartingOffsets(p)
private def getPartitionOffsets(kafkaOffsets: KafkaOffsets): Map[TopicPartition, Long] = {
def validateTopicPartitions(partitions: Set[TopicPartition],
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
assert(partitions == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
partitionOffsets
}
val partitions = kafkaReader.fetchTopicPartitions()
// Obtain TopicPartition offsets with late binding support
kafkaOffsets match {
case EarliestOffsets => partitions.map {
case tp => tp -> -2L
}.toMap
case LatestOffsets => partitions.map {
case tp => tp -> -1L
}.toMap
case SpecificOffsets(partitionOffsets) =>
validateTopicPartitions(partitions, partitionOffsets)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,26 @@ private[kafka010] class KafkaSourceRDD(
override def compute(
thePart: Partition,
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val range = thePart.asInstanceOf[KafkaSourceRDDPartition].offsetRange
var range = thePart.asInstanceOf[KafkaSourceRDDPartition].offsetRange
if (range.fromOffset < 0 || range.untilOffset < 0) {
// Late bind the offset range
val consumer = CachedKafkaConsumer.getOrCreate(
range.topic, range.partition, executorKafkaParams, reuseKafkaConsumer)
val fromOffset = if (range.fromOffset < 0) {
consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition))
consumer.rawConsumer.position(range.topicPartition)
} else {
range.fromOffset
}
val untilOffset = if (range.untilOffset < 0) {
consumer.rawConsumer.seekToEnd(ju.Arrays.asList(range.topicPartition))
consumer.rawConsumer.position(range.topicPartition)
} else {
range.untilOffset
}
range = KafkaSourceRDDOffsetRange(range.topicPartition,
fromOffset, untilOffset, range.preferredLoc)
}
assert(
range.fromOffset <= range.untilOffset,
s"Beginning offset ${range.fromOffset} is after the ending offset ${range.untilOffset} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.kafka.log=DEBUG

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.sql.test.SharedSQLContext

class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {

Expand Down Expand Up @@ -61,20 +61,19 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
testUtils.sendMessages(topic, Array("20"), Some(2))

// Specify explicit earliest and latest offset values
val reader = spark
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
var df = reader.selectExpr("CAST(value AS STRING)")
.selectExpr("CAST(value AS STRING)")
checkAnswer(df, (0 to 20).map(_.toString).toDF)

// "latest" should late bind to the current (latest) offset in the reader
// "latest" should late bind to the current (latest) offset in the df
testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, Some(2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add the following test below this line to make the semantics clear?

    // The same DataFrame instance should return the same result
    checkAnswer(df, (0 to 20).map(_.toString).toDF)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer holds now that we're binding in the executor, right?

df = reader.selectExpr("CAST(value AS STRING)")
checkAnswer(df, (0 to 29).map(_.toString).toDF)
}

Expand Down Expand Up @@ -118,25 +117,23 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
new TopicPartition(topic, 2) -> 1L // explicit offset happens to = the latest
)
val endingOffsets = JsonUtils.partitionOffsets(endPartitionOffsets)
val reader = spark
val df = spark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: maybe this kafka DF creation can be put in a function. otherwise lots of duplicate code.

.read
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", startingOffsets)
.option("endingOffsets", endingOffsets)
.load()
var df = reader.selectExpr("CAST(value AS STRING)")
.selectExpr("CAST(value as STRING)")
checkAnswer(df, (0 to 20).map(_.toString).toDF)

// static offset partition 2, nothing should change
testUtils.sendMessages(topic, (31 to 39).map(_.toString).toArray, Some(2))
df = reader.selectExpr("CAST(value AS STRING)")
checkAnswer(df, (0 to 20).map(_.toString).toDF)

// latest offset partition 1, should change
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(1))
df = reader.selectExpr("CAST(value AS STRING)")
checkAnswer(df, (0 to 30).map(_.toString).toDF)
}

Expand All @@ -147,18 +144,64 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
testUtils.sendMessages(topic, (0 to 10).map(_.toString).toArray, Some(0))

// Specify explicit earliest and latest offset values
val reader = spark
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
var df = reader.selectExpr("CAST(value AS STRING)")
.selectExpr("CAST(value AS STRING)")
checkAnswer(df.union(df), ((0 to 10) ++ (0 to 10)).map(_.toString).toDF)
}

test("test late binding start offsets") {
try {
// First, establish a new KafkaUtils instance that will clear
// all messages when cleanupLogs is called.
if (testUtils != null) {
testUtils.teardown()
testUtils = null
}
// The following settings will ensure that all log entries
// are removed following a call to cleanupLogs
val brokerProps = Map[String, Object](
"log.retention.bytes" -> 1.asInstanceOf[AnyRef], // retain nothing
"log.retention.ms" -> 1.asInstanceOf[AnyRef] // no wait time
)
testUtils = new KafkaTestUtils(withBrokerProps = Some(brokerProps))
testUtils.setup()

val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0))
// Specify explicit earliest and latest offset values
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
checkAnswer(df, (0 to 9).map(_.toString).toDF)
// Blow away current set of messages.
testUtils.cleanupLogs()
// Add some more data, but do not call cleanup
testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(0))
// Ensure that we late bind to the new starting position
checkAnswer(df, (10 to 19).map(_.toString).toDF)
} finally {
if (testUtils != null) {
testUtils.teardown()
}
testUtils = new KafkaTestUtils
testUtils.setup()
}
}

test("bad source options") {
def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
val ex = intercept[IllegalArgumentException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.SparkConf
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
class KafkaTestUtils extends Logging {
class KafkaTestUtils(withBrokerProps: Option[Map[String, Object]] = None) extends Logging {

// Zookeeper related configurations
private val zkHost = "localhost"
Expand Down Expand Up @@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging {
offsets
}

def cleanupLogs(): Unit = {
server.logManager.cleanupLogs()
}

def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get latest offsets")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please fix the log

kc.subscribe(topics.asJavaCollection)
kc.poll(0)
val partitions = kc.assignment()
kc.pause(partitions)
kc.seekToBeginning(partitions)
val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
kc.close()
logInfo("Closed consumer to get latest offsets")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please fix the log

offsets
}

def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get latest offsets")
Expand All @@ -274,6 +292,11 @@ class KafkaTestUtils extends Logging {
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
withBrokerProps.map { p =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can change the type of withBrokerProps to Map[String, Object]. Then here you can just use props.putAll(withBrokerProps.asJava).

p.foreach {
case (key, value) => props.put(key, value)
}
}
props
}

Expand Down