-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26695][SQL] data source v2 API refactor - continuous read #23619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming.{Sink, Source} | |
| import org.apache.spark.sql.sources._ | ||
| import org.apache.spark.sql.sources.v2._ | ||
| import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} | ||
| import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream | ||
| import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} | ||
| import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport | ||
| import org.apache.spark.sql.streaming.OutputMode | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
@@ -48,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
| with RelationProvider | ||
| with CreatableRelationProvider | ||
| with StreamingWriteSupportProvider | ||
| with ContinuousReadSupportProvider | ||
| with TableProvider | ||
| with Logging { | ||
| import KafkaSourceProvider._ | ||
|
|
@@ -107,46 +106,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
| new KafkaTable(strategy(options.asMap().asScala.toMap)) | ||
| } | ||
|
|
||
| /** | ||
| * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read | ||
| * Kafka data in a continuous streaming query. | ||
| */ | ||
| override def createContinuousReadSupport( | ||
|
||
| metadataPath: String, | ||
| options: DataSourceOptions): KafkaContinuousReadSupport = { | ||
| val parameters = options.asMap().asScala.toMap | ||
| validateStreamOptions(parameters) | ||
| // Each running query should use its own group id. Otherwise, the query may be only assigned | ||
| // partial data since Kafka will assign partitions to multiple consumers having the same group | ||
| // id. Hence, we should generate a unique id for each query. | ||
| val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) | ||
|
|
||
| val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } | ||
| val specifiedKafkaParams = | ||
| parameters | ||
| .keySet | ||
| .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) | ||
| .map { k => k.drop(6).toString -> parameters(k) } | ||
| .toMap | ||
|
|
||
| val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, | ||
| STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) | ||
|
|
||
| val kafkaOffsetReader = new KafkaOffsetReader( | ||
| strategy(caseInsensitiveParams), | ||
| kafkaParamsForDriver(specifiedKafkaParams), | ||
| parameters, | ||
| driverGroupIdPrefix = s"$uniqueGroupId-driver") | ||
|
|
||
| new KafkaContinuousReadSupport( | ||
| kafkaOffsetReader, | ||
| kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), | ||
| parameters, | ||
| metadataPath, | ||
| startingStreamOffsets, | ||
| failOnDataLoss(caseInsensitiveParams)) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a new base relation with the given parameters. | ||
| * | ||
|
|
@@ -406,7 +365,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
| } | ||
|
|
||
| class KafkaTable(strategy: => ConsumerStrategy) extends Table | ||
| with SupportsMicroBatchRead { | ||
| with SupportsMicroBatchRead with SupportsContinuousRead { | ||
|
|
||
| override def name(): String = s"Kafka $strategy" | ||
|
|
||
|
|
@@ -449,6 +408,40 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
| startingStreamOffsets, | ||
| failOnDataLoss(caseInsensitiveParams)) | ||
| } | ||
|
|
||
| override def toContinuousStream(checkpointLocation: String): ContinuousStream = { | ||
| val parameters = options.asMap().asScala.toMap | ||
| validateStreamOptions(parameters) | ||
| // Each running query should use its own group id. Otherwise, the query may be only assigned | ||
| // partial data since Kafka will assign partitions to multiple consumers having the same group | ||
| // id. Hence, we should generate a unique id for each query. | ||
| val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation) | ||
|
|
||
| val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } | ||
| val specifiedKafkaParams = | ||
| parameters | ||
| .keySet | ||
| .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) | ||
| .map { k => k.drop(6).toString -> parameters(k) } | ||
| .toMap | ||
|
|
||
| val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( | ||
| caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) | ||
|
|
||
| val kafkaOffsetReader = new KafkaOffsetReader( | ||
| strategy(caseInsensitiveParams), | ||
| kafkaParamsForDriver(specifiedKafkaParams), | ||
| parameters, | ||
| driverGroupIdPrefix = s"$uniqueGroupId-driver") | ||
|
|
||
| new KafkaContinuousStream( | ||
| kafkaOffsetReader, | ||
| kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), | ||
| parameters, | ||
| checkpointLocation, | ||
| startingStreamOffsets, | ||
| failOnDataLoss(caseInsensitiveParams)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -209,11 +209,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { | |
| assert( | ||
| query.lastExecution.executedPlan.collectFirst { | ||
| case scan: ContinuousScanExec | ||
| if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => | ||
| scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] | ||
| }.exists { config => | ||
| if scan.stream.isInstanceOf[KafkaContinuousStream] => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this logic is correct, but let's keep an eye on the tests after merging since some flakiness slipped through in the last iteration of the refactoring. |
||
| scan.stream.asInstanceOf[KafkaContinuousStream] | ||
| }.exists { stream => | ||
| // Ensure the new topic is present and the old topic is gone. | ||
| config.knownPartitions.exists(_.topic == topic2) | ||
| stream.knownPartitions.exists(_.topic == topic2) | ||
| }, | ||
| s"query never reconfigured to new topic $topic2") | ||
| } | ||
|
|
||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from https://github.com/apache/spark/pull/23619/files#diff-75718e2fd0d84469b882e6db9896e1b8L162