diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 03f82a505602f..8c26bc0d172ba 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[ContinuousStream]] for data from kafka. @@ -37,7 +38,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._ * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be * read by per-task consumers generated later. * @param kafkaParams String params for per-task Kafka consumers. - * @param sourceOptions Params which are not Kafka consumer params. + * @param options Params which are not Kafka consumer params. * @param metadataPath Path to a directory this reader can use for writing metadata. * @param initialOffsets The Kafka offsets to start reading data at. * @param failOnDataLoss Flag indicating whether reading should fail in data loss @@ -47,14 +48,14 @@ import org.apache.spark.sql.sources.v2.reader.streaming._ class KafkaContinuousStream( offsetReader: KafkaOffsetReader, kafkaParams: ju.Map[String, Object], - sourceOptions: Map[String, String], + options: CaseInsensitiveStringMap, metadataPath: String, initialOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) extends ContinuousStream with Logging { private val pollTimeoutMs = - sourceOptions.getOrElse(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, "512").toLong + options.getLong(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, 512) // Initialized when creating reader factories. If this diverges from the partitions at the latest // offsets, we need to reconfigure. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index dbd3310d48a8a..1eb4f85087f83 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -449,7 +449,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister new KafkaContinuousStream( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - parameters, + options, checkpointLocation, startingStreamOffsets, failOnDataLoss(caseInsensitiveParams))