Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,16 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
import org.apache.spark.util.UninterruptibleThread

/**
* A [[MicroBatchReadSupport]] that reads data from Kafka.
* A [[MicroBatchStream]] that reads data from Kafka.
*
* The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
* a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
Expand All @@ -55,13 +54,13 @@ import org.apache.spark.util.UninterruptibleThread
* To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
* and not use wrong broker addresses.
*/
private[kafka010] class KafkaMicroBatchReadSupport(
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
options: DataSourceOptions,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {

private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
Expand Down Expand Up @@ -94,16 +93,9 @@ private[kafka010] class KafkaMicroBatchReadSupport(
endPartitionOffsets
}

override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = {
new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
}

override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
val sc = config.asInstanceOf[SimpleStreamingScanConfig]
val startPartitionOffsets = sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val endPartitionOffsets = sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets

// Find the new partitions, and get their earliest offsets
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
Expand Down Expand Up @@ -168,7 +160,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
}.toArray
}

override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = {
override def createReaderFactory(): PartitionReaderFactory = {
KafkaMicroBatchReaderFactory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand All @@ -47,7 +49,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with CreatableRelationProvider
with StreamingWriteSupportProvider
with ContinuousReadSupportProvider
with MicroBatchReadSupportProvider
with TableProvider
with Logging {
import KafkaSourceProvider._

Expand Down Expand Up @@ -101,40 +103,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}

/**
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
* batches of Kafka data in a micro-batch streaming query.
*/
override def createMicroBatchReadSupport(
metadataPath: String,
options: DataSourceOptions): KafkaMicroBatchReadSupport = {

val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams = convertToSpecifiedParams(parameters)

val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaMicroBatchReadSupport(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
options,
metadataPath,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
override def getTable(options: DataSourceOptions): KafkaTable = {
new KafkaTable(strategy(options.asMap().asScala.toMap))
}

/**
Expand Down Expand Up @@ -434,6 +404,52 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
logWarning("maxOffsetsPerTrigger option ignored in batch queries")
}
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead {

override def name(): String = s"Kafka $strategy"

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}
}

class KafkaScan(options: DataSourceOptions) extends Scan {

override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams = convertToSpecifiedParams(parameters)

val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(parameters),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaMicroBatchStream(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
options,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
}
}
}

private[kafka010] object KafkaSourceProvider extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger

Expand Down Expand Up @@ -208,7 +208,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: DataSourceV2StreamingScanExec
case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists { config =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
Expand All @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: DataSourceV2StreamingScanExec
case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists(_.knownPartitions.size == newCount),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
Expand Down Expand Up @@ -118,11 +118,13 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
val sources: Seq[BaseStreamingSource] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _) => source
case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, _) => source
case r: StreamingDataSourceV2Relation
if r.stream.isInstanceOf[KafkaMicroBatchStream] =>
r.stream.asInstanceOf[KafkaMicroBatchStream]
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case r: StreamingDataSourceV2Relation
case r: OldStreamingDataSourceV2Relation
if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
}
Expand Down Expand Up @@ -1062,9 +1064,10 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
testStream(kafka)(
makeSureGetOffsetCalled,
AssertOnQuery { query =>
query.logicalPlan.collect {
case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => true
}.nonEmpty
query.logicalPlan.find {
case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[KafkaMicroBatchStream]
case _ => false
}.isDefined
}
)
}
Expand All @@ -1088,13 +1091,12 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
val readSupport = provider.createMicroBatchReadSupport(
dir.getAbsolutePath, new DataSourceOptions(options.asJava))
val config = readSupport.newScanConfigBuilder(
val dsOptions = new DataSourceOptions(options.asJava)
val table = provider.getTable(dsOptions)
val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
val inputPartitions = stream.planInputPartitions(
KafkaSourceOffset(Map(tp -> 0L)),
KafkaSourceOffset(Map(tp -> 100L))).build()
val inputPartitions = readSupport.planInputPartitions(config)
.map(_.asInstanceOf[KafkaMicroBatchInputPartition])
KafkaSourceOffset(Map(tp -> 100L))).map(_.asInstanceOf[KafkaMicroBatchInputPartition])
withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") {
assert(inputPartitions.size == numPartitionsGenerated)
inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
Expand Down Expand Up @@ -1410,7 +1412,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
val reader = spark
.readStream
.format("kafka")
.option("startingOffsets", s"latest")
.option("startingOffsets", "latest")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("failOnDataLoss", failOnDataLoss.toString)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;

/**
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
* micro-batch mode.
* <p>
* If a {@link Table} implements this interface, the
* {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
* builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
* </p>
*/
@Evolving
public interface SupportsMicroBatchRead extends SupportsRead { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of just being a marker interface, could these be a mixin interface for the Scan (that also defines the respective methods)? like:

public interface SupportsBatchRead extends Scan {
   Batch toBatch();
}

and so on. And if a Table supports Read one could query its Scan object to figure out the type if required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know the capability at the table level. It's too late to do it at the scan level, as creating a scan may be expensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only thing is it doesn't enforce anything. A method like Table.supportedTypes() might also work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arunmahadevan, that's similar to the capabilities that we plan to add. Spark will query specific capabilities for a table to make determinations like this to cut down on the number of empty interfaces.

Loading