-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 #21488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #91429 has finished for PR 21488 at commit
|
| zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) | ||
| zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) | ||
| zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM) | ||
| adminZkClient = new AdminZkClient(zkClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the Java AdminClient instead of these internal classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AdminClient is abstract.
KafkaAdminClient doesn't provide addPartitions.
Mind giving some pointer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AdminClient.create gives you a concrete instance. createPartitions is the method you're looking for.
|
Test build #91431 has finished for PR 21488 at commit
|
|
Test build #91432 has finished for PR 21488 at commit
|
| val existingAssignment = zkClient.getReplicaAssignmentForTopics( | ||
| collection.immutable.Set(topic)).map { | ||
| case (topicPartition, replicas) => topicPartition.partition -> replicas | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get replica assignment information via AdminClient too. I think we should try to avoid the internal ZkUtils and KafkaZkClient as much as we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Test build #91485 has finished for PR 21488 at commit
|
|
Test build #91487 has finished for PR 21488 at commit
|
|
KafkaMicroBatchSourceSuite hangs with current PR. Looking at KafkaOffsetReader.scala , I see call to KafkaConsumer.poll(0) |
|
I tried the following change but didn't seem to get more output from Kafka: |
external/kafka-0-10-sql/pom.xml
Outdated
| <kafka.version>2.0.0-SNAPSHOT</kafka.version> | ||
| </properties> | ||
| <packaging>jar</packaging> | ||
| <name>Kafka 0.10 Source for Structured Streaming</name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change this line to reflect the change too
|
Test build #91508 has finished for PR 21488 at commit
|
|
There is only target/surefire-reports/TEST-org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite.xml under target/surefire-reports That file doesn't contain test log. |
|
Test build #91529 has finished for PR 21488 at commit
|
|
Test build #91587 has finished for PR 21488 at commit
|
|
Made some progress in testing. |
|
Test build #91589 has finished for PR 21488 at commit
|
|
Located the test output: Still need to find out cause for assertion failure. |
|
Any luck getting to the bottom of the issue? It would be great to include this in the next version of Spark. |
|
@tedyu could you please just bump to 1.1.0, the current official latest release from Apache Kafka? |
external/kafka-0-10-sql/pom.xml
Outdated
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.eclipse.jetty</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this come from? Or it can be just a test dependency?
| assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) | ||
| // Poll to get the latest assigned partitions | ||
| consumer.poll(0) | ||
| consumer.poll(JDuration.ofMillis(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you revert these changes? We don't use java.time.Duration in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on the Kafka release we agree upon, I can revert.
Duration is recommended API for 2.0.0 release
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tedyu just realized this is ofMillis rather than toMillis. We definitely cannot use it as this poll overload doesn't exist in previous versions and we want to support Kafka versions from 0.10 to 2.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing Why do you want to support Kafka clients jars from 0.10 to 2.0? Since newer clients jars support older brokers, we recommend people use the latest Kafka clients jar whenever possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. However, supporting all these versions is pretty cheap for Spark right now. Spark is using only APIs in 0.10. In addition, if the Kafka client version we pick up here has some critical issue, the user can just switch to an old version.
|
Ryan: Once Kafka 2.0.0 is released, I will incorporate the above. |
|
Test build #93736 has finished for PR 21488 at commit
|
Doesn't seem to be related to PR. |
|
retest this please |
|
Test build #93745 has finished for PR 21488 at commit
|
|
https://github.com/apache/kafka/releases KAFKA 2.0.0 release is publicly available. We can finish it before the code freeze? |
|
@zsxwing |
|
I might have missed this in the shuffle here -- is this fully compatible with 0.10.x brokers too? |
Yep. @ijuma could you confirm it? |
|
Yes, Kafka clients 0.10.2 and higher support brokers from 0.10.0 and higher if the protocols being used are available in that version. Since we only changed test code in this PR, upgrading to 2.0.0 is safe. If Spark were to use the AdminClient create topics, then the brokers would have to be 0.10.1 or higher. Other admin protocols like config management were added in more recent versions and so on. |
|
@ijuma this changes the version of |
|
@srowen I meant that the library is compatible, so if you just change the version in the pom, it's fine. If you changed the code to use some methods in AdminClient, then you'd have to be more careful. Does that make sense? The major version bump is because we now require Java 8 (which I believe you require too) and the Scala clients were removed. |
|
Yes, and that then impacts programs that depend on this module to use Kafka 0.10+ from Spark. They'd have to update Yes Spark requires Java 8. You're saying although the (two) major version bumps look scary there was little change in the client library itself? |
|
@srowen just to be clear, In addition, even if this upgrade has some unexpected issues, the user can still switch back to an old |
|
Yeah, the Java client libraries have been evolved in a compatible manner for the most part since 0.10.0. The set of broker versions supported by 0.10.0 and 2.0.0 is exactly the same. The consumer/producer API has been enriched (transactions, idempotent producer, offsetsForTimes), but the existing methods have been kept. A small number of deprecated, but rarely used methods have been removed (not in KafkaProducer or KafkaConsumer though): In 0.10.1, a heartbeat thread was added to the Java consumer: This is helpful to users who could not call |
|
@zsxwing |
|
@ijuma I see. I was looking at 0.10 jar. Thanks for correcting me. |
|
Anyway, overall I think you should definitely make this change. Spark users are currently penalised heavily when running on clusters with the message format introduced in 0.11.0, which has important resiliency improvements (e.g. KIP-101). And, as @zsxwing said, people can choose an older version if necessary (I can't think of a reason why, the reason why we have focused on making client jars compatible is so that people can just use the latest independently of broker versions). |
|
Yes, good argument. Having been burned in the past (actually, by Kafka and ZK changes, though that's in the past), I'm aware that even compiling against version B instead of A, with no library code changes, may still mean the library no longer runs against version A. Changing exception signatures is also sometimes a problem. It may not be the case here, unlikely, but do want to think this through fully. Yes differences like apache/kafka@a4c2921 are what I am worried about. Still I have no concrete example of this type of problem in the wild. |
|
@srowen May I read your comment as "no objections"? The current PR looks good to me. If you don't have objections, I will go ahead and merge it. |
|
Yes no objections. There's no concrete problem, and there is upside to making the change. I think you're aware of the potential issues, and have thought through them in the context of deeper knowledge of Kafka's APIs than I have. |
|
Thanks! Merging to master. |
|
It seems this commit cause ...
[info] KafkaSourceStressSuite:
[info] - stress test with multiple topics and partitions (19 seconds, 255 milliseconds)
[info] KafkaSourceStressForDontFailOnDataLossSuite:
[info] - stress test for failOnDataLoss=false *** FAILED *** (45 seconds, 648 milliseconds)
[info] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk.
[info] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
[info] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
[info] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$2.apply(KafkaTestUtils.scala:254)
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$2.apply(KafkaTestUtils.scala:248)
[info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info] at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
[info] at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
[info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info] at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils.sendMessages(KafkaTestUtils.scala:248)
[info] at org.apache.spark.sql.kafka010.KafkaTestUtils.sendMessages(KafkaTestUtils.scala:238)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite$$anonfun$48$$anonfun$apply$26$$anonfun$apply$27.apply(KafkaMicroBatchSourceSuite.scala:1268)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite$$anonfun$48$$anonfun$apply$26$$anonfun$apply$27.apply(KafkaMicroBatchSourceSuite.scala:1267)
[info] at scala.collection.immutable.Range.foreach(Range.scala:160)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite$$anonfun$48$$anonfun$apply$26.apply(KafkaMicroBatchSourceSuite.scala:1267)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite$$anonfun$48$$anonfun$apply$26.apply(KafkaMicroBatchSourceSuite.scala:1265)
[info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite$$anonfun$48.apply(KafkaMicroBatchSourceSuite.scala:1265)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
[info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:1155)
[info] at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info] at org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.runTest(KafkaMicroBatchSourceSuite.scala:1155)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info] at scala.collection.immutable.List.foreach(List.scala:392)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info] at org.scalatest.Suite$class.run(Suite.scala:1147)
[info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
[info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:748)
[info] Cause: org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk.
Exception in thread "Thread-9" java.io.EOFException
at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2954)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1$React.react(Framework.scala:809)
at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1.run(Framework.scala:798)
at java.lang.Thread.run(Thread.java:748)How to reproduce: build/sbt -Phadoop-2.6 "sql-kafka-0-10/testOnly" |
|
I used the following command and the test passed: mvn test -Phadoop-2.6 -Pyarn -Phive -Dtest=KafkaMicroBatchSourceSuite -rf external/kafka-0-10-sql Please take a look at the 'Disk error' message and see if it was related to test failure. |
|
Hm, looking at the dashboard, I don't see this failure consistently in the master tests: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/ Let's keep a close eye on it to see if it repeats. It might have been something transient. |
|
@wangyum, can you please file a Kafka JIRA with details of what the test is doing (even if the failure is transient)? From the stacktrace, it looks like a potential broker issue (assuming there are no real disk issues where these tests were executed). If there is indeed a new issue (we have to verify since the test failure seems to be transient), it would likely only affect tests. |
|
Ack I missed something here: there's an override of kafka.version for Scala 2.12, from when it had to be bumped up to work with 2.12. That no longer works when compiling with 2.12. I'll submit a follow up. |
What changes were proposed in this pull request?
This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.
How was this patch tested?
This PR uses existing Kafka related unit tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.