diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 099d6ff13051b..14dcbeef0d9a3 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -194,6 +194,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala index 5b634e4d50641..54ce1717acc71 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala @@ -224,6 +224,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaM .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", "failOnDataLoss.*") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 517d153ca3c91..63659989dec1b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -364,6 +364,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") @@ -401,6 +402,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") .option("startingOffsets", "earliest") .option("subscribePattern", s"$topicPrefix-.*") @@ -590,6 +592,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribe", topic) // If a topic is deleted and we try to poll data starting from offset 0, // the Kafka consumer will just block until timeout and return an empty result. @@ -1861,6 +1864,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest { .option("subscribePattern", "stress.*") .option("failOnDataLoss", "false") .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]