From 78bf63958e2f85490b2a23fbb249c6da6585d7a6 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 7 Oct 2018 22:55:28 -0700 Subject: [PATCH 1/3] [SPARK-25631][SPARK-25632] Improve the test runtime of KafkaRDDSuite --- .../org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 561bca5f5537..d0fc780e2560 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -41,6 +41,10 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val sparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getSimpleName) + // Set a poll time out of 10 seconds. Othewise the poll time out defaults to + // 2 minutes causing the test cases to run longer. + .set("spark.streaming.kafka.consumer.poll.ms", "10000") + private var sc: SparkContext = _ override def beforeAll { From 47e0940a0795b85d0b2c34596fa5e21850feb5cb Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 7 Oct 2018 23:05:06 -0700 Subject: [PATCH 2/3] doc fix --- .../org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index d0fc780e2560..47bc8fec2c80 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -41,8 +41,8 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val sparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - // Set a poll time out of 10 seconds. Othewise the poll time out defaults to - // 2 minutes causing the test cases to run longer. + // Set a timeout of 10 seconds that's going to be used to fetch topics/partitions from kafka. + // Othewise the poll timeout defaults to 2 minutes and causes test cases to run longer. .set("spark.streaming.kafka.consumer.poll.ms", "10000") private var sc: SparkContext = _ From 609daba4f851660e010b89f995f965736fe3d8df Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 16 Oct 2018 14:29:36 -0700 Subject: [PATCH 3/3] Code review --- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 1974bb1e12e1..93d0d2fd06c7 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -52,6 +52,9 @@ class DirectKafkaStreamSuite val sparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) + // Set a timeout of 10 seconds that's going to be used to fetch topics/partitions from kafka. + // Othewise the poll timeout defaults to 2 minutes and causes test cases to run longer. + .set("spark.streaming.kafka.consumer.poll.ms", "10000") private var ssc: StreamingContext = _ private var testDir: File = _