-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18682][SS] Batch Source for Kafka #16686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| @@ -1 +1 @@ | |||
| org.apache.spark.sql.kafka010.KafkaSourceProvider | |||
| org.apache.spark.sql.kafka010.KafkaProvider | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tcondie, I just happened to look at this PR. I just wonder if this breaks existing codes that use .format("org.apache.spark.sql.kafka010.KafkaSourceProvider") although almost no users use this by that name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #71894 has finished for PR 16686 at commit
|
|
Test build #71942 has finished for PR 16686 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made one pass. There are two major issues:
- KafkaRelation may be reused (e.g.,
df.union(df)) and break CachedKafkaConsumer's assumptions. We can add a flag to not use the cached consumer. - Don't change the
KafkaSourceProvidername.
|
|
||
| def close() | ||
|
|
||
| def fetchSpecificStartingOffsets( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add comments for these methods?
|
|
||
| /** | ||
| * The Kafka Consumer must be called in an UninterruptibleThread. This naturally occurs | ||
| * in Spark Streaming, but not in Spark SQL, which will use this call to communicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Spark Streaming -> Structured Streaming
| private[kafka010] class UninterruptibleKafkaOffsetReader(kafkaOffsetReader: KafkaOffsetReader) | ||
| extends KafkaOffsetReader with Logging { | ||
|
|
||
| private class KafkaOffsetReaderThread extends UninterruptibleThread("Kafka Offset Reader") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This must be a daemon thread.
Actually, you can create the ExecutionContext using the following simple codes:
val kafkaReaderThread = Executors.newSingleThreadExecutor(new ThreadFactory {
override def newThread(r: Runnable): Thread = {
val t = new UninterruptibleThread("Kafka Offset Reader")
t.setDaemon(true)
t
}
})
val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
// Close
kafkaReaderThread.shutdownNow()|
|
||
| import KafkaSourceProvider._ | ||
| // Used to check parameters for different source modes | ||
| private sealed trait Mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you move these classes and deserClassName to object KafkaProvider?
| @@ -1 +1 @@ | |||
| org.apache.spark.sql.kafka010.KafkaSourceProvider | |||
| org.apache.spark.sql.kafka010.KafkaProvider | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .option("startingOffsets", "earliest") | ||
| .option("endingOffsets", "latest") | ||
| .load() | ||
| assert(reader.count() === 21) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can extend QueryTest rather than SparkFunSuite to use checkAnswer like this:
var df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
checkAnswer(df, (0 to 20).map(_.toString).toDF)
| offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) | ||
|
|
||
| // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. | ||
| val rdd = new KafkaSourceRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found df.union(df) will just union the same RDD which breaks the group id assumption. The same CachedKafkaConsumer will be used by two different tasks. For batch queries, caching consumers is not necessary. Could you add a flag to KafkaSourceRDD to not use the cached consumer? It's better to also write a test to cover this case. In addition, this test should one use one partition in order to launch two tasks from different RDDs at the same time: TestSparkSession uses local[2], so it can only run two tasks at the same time.
| val preferredLoc = if (numExecutors > 0) { | ||
| // This allows cached KafkaConsumers in the executors to be re-used to read the same | ||
| // partition in every batch. | ||
| Some(sortedExecutors(KafkaUtils.floorMod(tp.hashCode, numExecutors))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to set the preferred locations after changing to not use the cached consumers.
| val untilPartitionOffsets = getPartitionOffsets(endingOffsets) | ||
| // Obtain topicPartitions in both from and until partition offset, ignoring | ||
| // topic partitions that were added and/or deleted between the two above calls. | ||
| val topicPartitions = fromPartitionOffsets.keySet.intersect(untilPartitionOffsets.keySet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to throw an exception rather than ignoring the deleted partitions.
| endingOffsets: KafkaOffsets) | ||
| extends BaseRelation with TableScan with Logging { | ||
|
|
||
| require(startingOffsets != LatestOffsets, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: changed it to assert since the parameters have already been validated.
|
Test build #71954 has finished for PR 16686 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall except nits.
| testUtils.sendMessages(topic, (0 to 10).map(_.toString).toArray, Some(0)) | ||
|
|
||
| // Ensure local[2] so that two tasks will execute the query on one partition | ||
| val testSession = new TestSparkSession(sparkContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you don't need to create TestSparkSession. I meant this test uses TestSparkSession and it uses local[2].
| .load() | ||
| var df = reader.selectExpr("CAST(value AS STRING)") | ||
| checkAnswer(df.union(df), | ||
| (0 to 10).map(_.toString).toDF.union((0 to 10).map(_.toString).toDF)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ((0 to 10) ++ (0 to 10)).map(_.toString).toDF.
| "for starting and ending offsets") | ||
| } | ||
|
|
||
| val sortedExecutors = KafkaUtils.getSortedExecutorList(sqlContext.sparkContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not used any more (these 3 lines)
|
|
||
| val kafkaReaderThread = Executors.newSingleThreadExecutor(new ThreadFactory { | ||
| override def newThread(r: Runnable): Thread = { | ||
| logInfo("NEW UNINTERRUPTIBLE THREAD KAFKA OFFSET") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this debug log.
| kafkaOffsetReader.fetchNewPartitionEarliestOffsets(newPartitions) | ||
| }(execContext) | ||
| ThreadUtils.awaitResult(future, Duration.Inf) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line
| // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. | ||
| val rdd = new KafkaSourceRDD( | ||
| sqlContext.sparkContext, executorKafkaParams, offsetRanges, | ||
| pollTimeoutMs, failOnDataLoss, false).map { cr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: false -> reuseKafkaConsumer = false
| // Obtain topicPartitions in both from and until partition offset, ignoring | ||
| // topic partitions that were added and/or deleted between the two above calls. | ||
| if (fromPartitionOffsets.keySet.size != untilPartitionOffsets.keySet.size) { | ||
| throw new IllegalStateException("Kafka return different topic partitions " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please include fromPartitionOffsets and untilPartitionOffsets to the exception message so that it's easy to debug such failure.
| // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. | ||
| val rdd = new KafkaSourceRDD( | ||
| sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr => | ||
| sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss, true).map { cr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: true -> reuseKafkaConsumer = true
|
Test build #71957 has finished for PR 16686 at commit
|
|
Test build #71997 has finished for PR 16686 at commit
|
|
Test build #72045 has finished for PR 16686 at commit
|
| checkAnswer(df, (0 to 20).map(_.toString).toDF) | ||
|
|
||
| // "latest" should late bind to the current (latest) offset in the reader | ||
| testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, Some(2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add the following test below this line to make the semantics clear?
// The same DataFrame instance should return the same result
checkAnswer(df, (0 to 20).map(_.toString).toDF)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This no longer holds now that we're binding in the executor, right?
|
Test build #72077 has finished for PR 16686 at commit
|
|
Test build #72081 has finished for PR 16686 at commit
|
|
Test build #72085 has finished for PR 16686 at commit
|
|
jenkins retest this please |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add LATEST and EARLIEST to KafkaUtils and replace the magic number -1 and -2? Sorry that I didn't bring it up early.
| props.put("log.flush.interval.messages", "1") | ||
| props.put("replica.socket.timeout.ms", "1500") | ||
| props.put("delete.topic.enable", "true") | ||
| withBrokerProps.map { p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can change the type of withBrokerProps to Map[String, Object]. Then here you can just use props.putAll(withBrokerProps.asJava).
|
|
||
| def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { | ||
| val kc = new KafkaConsumer[String, String](consumerConfiguration) | ||
| logInfo("Created consumer to get latest offsets") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please fix the log
| kc.seekToBeginning(partitions) | ||
| val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap | ||
| kc.close() | ||
| logInfo("Closed consumer to get latest offsets") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please fix the log
| if (range.fromOffset < 0 || range.untilOffset < 0) { | ||
| // Late bind the offset range | ||
| val fromOffset = if (range.fromOffset < 0) { | ||
| consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add assert(range.fromOffset == -2) to avoid breaking it in future.
| range.fromOffset | ||
| } | ||
| val untilOffset = if (range.untilOffset < 0) { | ||
| consumer.rawConsumer.seekToEnd(ju.Arrays.asList(range.topicPartition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add assert(range.fromOffset == -1) to avoid breaking it in future.
| assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) | ||
| // Poll to get the latest assigned partitions | ||
| consumer.poll(0) | ||
| consumer.assignment().asScala.toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please also call pause like this to avoid fetching the real data when reusing the relation.
val partitions = consumer.assignment()
consumer.pause(partitions)
partitions.asScala.toSet
|
Test build #72210 has finished for PR 16686 at commit
|
|
Test build #72211 has finished for PR 16686 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall its looks fine, but needs some work with the code organization. I believe we can reduce the number of classes and LOCs quite a bit.
| private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] | ||
|
|
||
| private var consumer = createConsumer | ||
| var rawConsumer = createConsumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exposing internal var is generally not a good idea. A better approach is be to add the necessary methods (for which you need the consumer) in the class CachedKafkaConsumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and why renamed?
| val consumer = CachedKafkaConsumer.getOrCreate( | ||
| range.topic, range.partition, executorKafkaParams) | ||
| range.topic, range.partition, executorKafkaParams, reuseKafkaConsumer) | ||
| if (range.fromOffset < 0 || range.untilOffset < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Does this piece of code need to resolve the range need to be inside the NextIterator? This is cause a lot of unnecessary nesting. Instead of making the range var, you can resolve the range above and then create the NextIterator.
Furthermore, why use rawConsumer directly and expose it? Why not use CachedKafkaConsumer.getAvailableOffsetRange()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworked it. Let me know what you think.
| partition: Int, | ||
| kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { | ||
| kafkaParams: ju.Map[String, Object], | ||
| reuse: Boolean): CachedKafkaConsumer = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean reuse existing one, OR allow reuse in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse existing. I changed the name to reuseExistingIfPresent.
| import org.apache.spark.util.{ThreadUtils, UninterruptibleThread} | ||
|
|
||
|
|
||
| private[kafka010] trait KafkaOffsetReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this trait a little weird. fetchTopicPartitions() fetches topic and partitions of what?
clarifying these in the docs would be good.
| def fetchTopicPartitions(): Set[TopicPartition] | ||
|
|
||
| /** | ||
| * Set consumer position to specified offsets, making sure all assignments are set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This docs seems wrong. Name says it should fetch offsets, but docs says it sets something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and whats the difference between earliest and starting offsets?
| * by the Map that is passed to the function. | ||
| */ | ||
| override def createRelation( | ||
| sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect indentation.
| .build() | ||
|
|
||
| private def kafkaParamsForExecutors( | ||
| specifiedKafkaParams: Map[String, String], uniqueGroupId: String) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also convention is to have each param in different line.
| private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" | ||
|
|
||
| // Used to check parameters for different source modes | ||
| private sealed trait Mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented elsewhere, Mode should not be required.
| private[kafka010] object KafkaUtils { | ||
|
|
||
| // Used to denote unbounded offset positions | ||
| val LATEST = -1L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having these constants here does not make sense. Better to have them in an object KafkaOffsets and put these numbers in them.
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.scheduler.ExecutorCacheTaskLocation | ||
|
|
||
| private[kafka010] object KafkaUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont see the need for this class. LATEST and EARLIEST is better put in object KafkaOffsets (trait already exists), and the other methods used to be part KafkaSource and may continue to be in their (unless anybody else uses it)
|
Test build #72294 has finished for PR 16686 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last refactoring looks pretty good. just a few more nits.
| val warningMessage = | ||
| s""" | ||
| |The current available offset range is [$earliestOffset, $latestOffset). | ||
| |The current available offset range is [${range.earliest}, ${range.latest}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: offset range is $range
| s""" | ||
| |The current available offset range is [$earliestOffset, $latestOffset). | ||
| | Offset ${offset} is out of range, and records in [$offset, $earliestOffset) will be | ||
| |The current available offset range is [${range.earliest}, ${range.latest}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same as above.
|
|
||
| private[kafka010] case object EarliestOffsets extends StartingOffsets | ||
| /** | ||
| * Bind to the earliest offsets in Kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: better docs. this is object, not a method. say what the object represents. "Bind to earliest offsets..." is like docs for a method
| partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsets | ||
|
|
||
| private[kafka010] object KafkaOffsets { | ||
| // Used to denote unbounded offset positions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Used to represent unresolved offset limits as longs
"unbounded" sounds like its infinite, or something.
|
|
||
|
|
||
| private[kafka010] class KafkaRelation( | ||
| override val sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect indents
| } | ||
|
|
||
| /** | ||
| * Fetch the earliest offsets of partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you specify which partitions? maybe "offsets of all partitions to be consumed according the consumer strategy"
same for docs of other methods that do not take a specific list of partitions.
| } | ||
| } | ||
|
|
||
| private def runUninterruptibly[T](body: => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add docs.
| } | ||
|
|
||
| /** | ||
| * Helper function that does multiple retries on the a body of code that returns offsets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: on the a body
| "log.retention.bytes" -> 1.asInstanceOf[AnyRef], // retain nothing | ||
| "log.retention.ms" -> 1.asInstanceOf[AnyRef] // no wait time | ||
| ) | ||
| testUtils = new KafkaTestUtils(withBrokerProps = brokerProps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why disturb testUtils? Why not assign to another local var? then you dont have to tear down and setup all this stuff.
| } | ||
| } | ||
|
|
||
| private def createDF(topic: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be
private def createDF(
topic: String,
withOptions: ...
|
Test build #72316 has finished for PR 16686 at commit
|
|
Test build #72333 has finished for PR 16686 at commit
|
|
LGTM! |
|
@zsxwing please merge if you think your concerns were addressed correctly. |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM expect missing synchronized.
| val topicPartition = new TopicPartition(topic, partition) | ||
| val key = CacheKey(groupId, topicPartition) | ||
|
|
||
| val removedConsumer = cache.remove(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add synchronized.
| val topicPartition = new TopicPartition(topic, partition) | ||
| val key = CacheKey(groupId, topicPartition) | ||
|
|
||
| val consumer = cache.get(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add synchronized
|
Test build #72534 has finished for PR 16686 at commit
|
|
Test build #72536 has finished for PR 16686 at commit
|
|
LGTM. Merging to master and 2.1. |
Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with spark.read as well. The options should be the same as the streaming kafka source, with the following differences: startingOffsets should default to earliest, and should not allow latest (which would always be empty). endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted. It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work). KafkaRelationSuite was added for testing batch queries via KafkaUtils. Author: Tyson Condie <[email protected]> Closes #16686 from tcondie/SPARK-18682. (cherry picked from commit 8df4444) Signed-off-by: Shixiong Zhu <[email protected]>
|
I recently have noticed a few flaky test failures of KafkaSourceSuite.subscribing topic by pattern with topic deletions (e.g., https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72725/testReport/org.apache.spark.sql.kafka010/KafkaSourceSuite/subscribing_topic_by_pattern_with_topic_deletions/). Is it possible those were caused by this PR? (Filed this JIRA: https://issues.apache.org/jira/browse/SPARK-19559) |
|
Hi @kayousterhout, #16902 is a fix to the flaky |
## What changes were proposed in this pull request? Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with spark.read as well. The options should be the same as the streaming kafka source, with the following differences: startingOffsets should default to earliest, and should not allow latest (which would always be empty). endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted. It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work). ## How was this patch tested? KafkaRelationSuite was added for testing batch queries via KafkaUtils. Author: Tyson Condie <[email protected]> Closes apache#16686 from tcondie/SPARK-18682.
What changes were proposed in this pull request?
Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with spark.read as well.
The options should be the same as the streaming kafka source, with the following differences:
startingOffsets should default to earliest, and should not allow latest (which would always be empty).
endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted.
It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work).
How was this patch tested?
KafkaRelationSuite was added for testing batch queries via KafkaUtils.