Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A [[ContinuousStream]] for data from kafka.
*
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
* @param sourceOptions Params which are not Kafka consumer params.
* @param options Params which are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
Expand All @@ -47,14 +48,14 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
class KafkaContinuousStream(
offsetReader: KafkaOffsetReader,
kafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
options: CaseInsensitiveStringMap,
metadataPath: String,
initialOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends ContinuousStream with Logging {

private val pollTimeoutMs =
sourceOptions.getOrElse(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, "512").toLong
options.getLong(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, 512)

// Initialized when creating reader factories. If this diverges from the partitions at the latest
// offsets, we need to reconfigure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
new KafkaContinuousStream(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
parameters,
options,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
Expand Down