From 23c2d715a00e156f5e4fea0a29459435132d8307 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 25 Jun 2019 12:22:51 +0200 Subject: [PATCH 1/9] [SPARK-28163][SS] Use CaseInsensitiveMap for KafkaOffsetReader --- .../sql/kafka010/KafkaSourceProvider.scala | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 1eb4f85087f83..abba2d6138f4c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.internal.Logging import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ @@ -393,23 +394,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { - val parameters = options.asScala.toMap - validateStreamOptions(parameters) + val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap) + validateStreamOptions(caseInsensitiveOptions) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation) + val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation) - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = convertToSpecifiedParams(parameters) + val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) val kafkaOffsetReader = new KafkaOffsetReader( - strategy(parameters), + strategy(caseInsensitiveOptions), kafkaParamsForDriver(specifiedKafkaParams), - parameters, + caseInsensitiveOptions, driverGroupIdPrefix = s"$uniqueGroupId-driver") new KafkaMicroBatchStream( @@ -418,32 +418,26 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister options, checkpointLocation, startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + failOnDataLoss(caseInsensitiveOptions)) } override def toContinuousStream(checkpointLocation: String): ContinuousStream = { - val parameters = options.asScala.toMap - validateStreamOptions(parameters) + val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap) + validateStreamOptions(caseInsensitiveOptions) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation) + val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation) - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap + val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), + strategy(caseInsensitiveOptions), kafkaParamsForDriver(specifiedKafkaParams), - parameters, + caseInsensitiveOptions, driverGroupIdPrefix = s"$uniqueGroupId-driver") new KafkaContinuousStream( @@ -452,7 +446,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister options, checkpointLocation, startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + failOnDataLoss(caseInsensitiveOptions)) } } } From a4790ad327f42b929bd13076c25582b6befdd9c3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 4 Jul 2019 16:04:51 +0200 Subject: [PATCH 2/9] Added tests + converting incoming Map to CaseInsensitiveMap --- .../spark/sql/kafka010/KafkaBatch.scala | 3 +- .../sql/kafka010/KafkaContinuousStream.scala | 2 +- .../sql/kafka010/KafkaMicroBatchStream.scala | 20 ++-- .../sql/kafka010/KafkaOffsetReader.scala | 19 ++-- .../spark/sql/kafka010/KafkaRelation.scala | 4 +- .../sql/kafka010/KafkaSourceProvider.scala | 34 +++--- .../kafka010/KafkaSourceProviderSuite.scala | 104 +++++++++++++----- 7 files changed, 118 insertions(+), 68 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala index e3c8536d22070..f89be1410ff6a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala @@ -22,12 +22,13 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReaderFactory} private[kafka010] class KafkaBatch( strategy: ConsumerStrategy, - sourceOptions: Map[String, String], + sourceOptions: CaseInsensitiveMap[String], specifiedKafkaParams: Map[String, String], failOnDataLoss: Boolean, startingOffsets: KafkaOffsetRangeLimit, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 8c26bc0d172ba..9489b7f5e9ecb 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * properly read. */ class KafkaContinuousStream( - offsetReader: KafkaOffsetReader, + private val offsetReader: KafkaOffsetReader, kafkaParams: ju.Map[String, Object], options: CaseInsensitiveStringMap, metadataPath: String, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 9fb338f0e6688..f3c877ce7b566 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.UninterruptibleThread * and not use wrong broker addresses. */ private[kafka010] class KafkaMicroBatchStream( - kafkaOffsetReader: KafkaOffsetReader, + private val offsetReader: KafkaOffsetReader, executorKafkaParams: ju.Map[String, Object], options: CaseInsensitiveStringMap, metadataPath: String, @@ -85,7 +85,7 @@ private[kafka010] class KafkaMicroBatchStream( override def latestOffset(start: Offset): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets - val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + val latestPartitionOffsets = offsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets => rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) }.getOrElse { @@ -100,7 +100,7 @@ private[kafka010] class KafkaMicroBatchStream( // Find the new partitions, and get their earliest offsets val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) - val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) + val newPartitionInitialOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) if (newPartitionInitialOffsets.keySet != newPartitions) { // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) @@ -117,7 +117,7 @@ private[kafka010] class KafkaMicroBatchStream( val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { val message = - if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { s"$deletedPartitions are gone. Some data may have been missed." @@ -172,10 +172,10 @@ private[kafka010] class KafkaMicroBatchStream( override def commit(end: Offset): Unit = {} override def stop(): Unit = { - kafkaOffsetReader.close() + offsetReader.close() } - override def toString(): String = s"KafkaV2[$kafkaOffsetReader]" + override def toString(): String = s"KafkaV2[$offsetReader]" /** * Read initial partition offsets from the checkpoint, or decide the offsets and write them to @@ -195,11 +195,11 @@ private[kafka010] class KafkaMicroBatchStream( metadataLog.get(0).getOrElse { val offsets = startingOffsets match { case EarliestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) + KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None)) + KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => - kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) + offsetReader.fetchSpecificOffsets(p, reportDataLoss) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") @@ -212,7 +212,7 @@ private[kafka010] class KafkaMicroBatchStream( limit: Long, from: PartitionOffsetMap, until: PartitionOffsetMap): PartitionOffsetMap = { - val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val fromNew = offsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) val sizes = until.flatMap { case (tp, end) => // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index ad608ecafe59f..13fce488c71ef 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.types._ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread} @@ -47,7 +48,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread} private[kafka010] class KafkaOffsetReader( consumerStrategy: ConsumerStrategy, val driverKafkaParams: ju.Map[String, Object], - readerOptions: Map[String, String], + readerOptions: CaseInsensitiveMap[String], driverGroupIdPrefix: String) extends Logging { /** * Used to ensure execute fetch operations execute in an UninterruptibleThread @@ -88,10 +89,10 @@ private[kafka010] class KafkaOffsetReader( _consumer } - private val maxOffsetFetchAttempts = + private val fetchOffsetNumRetries = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt - private val offsetFetchAttemptIntervalMs = + private val fetchOffsetRetryIntervalMs = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong private def nextGroupId(): String = { @@ -293,12 +294,12 @@ private[kafka010] class KafkaOffsetReader( if (incorrectOffsets.nonEmpty) { logWarning("Found incorrect offsets in some partitions " + s"(partition, previous offset, fetched offset): $incorrectOffsets") - if (attempt < maxOffsetFetchAttempts) { + if (attempt < fetchOffsetNumRetries) { logWarning("Retrying to fetch latest offsets because of incorrect offsets") - Thread.sleep(offsetFetchAttemptIntervalMs) + Thread.sleep(fetchOffsetRetryIntervalMs) } } - } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) + } while (incorrectOffsets.nonEmpty && attempt < fetchOffsetNumRetries) logDebug(s"Got latest offsets for partition : $partitionOffsets") partitionOffsets @@ -371,7 +372,7 @@ private[kafka010] class KafkaOffsetReader( var result: Option[Map[TopicPartition, Long]] = None var attempt = 1 var lastException: Throwable = null - while (result.isEmpty && attempt <= maxOffsetFetchAttempts + while (result.isEmpty && attempt <= fetchOffsetNumRetries && !Thread.currentThread().isInterrupted) { Thread.currentThread match { case ut: UninterruptibleThread => @@ -389,7 +390,7 @@ private[kafka010] class KafkaOffsetReader( lastException = e logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) attempt += 1 - Thread.sleep(offsetFetchAttemptIntervalMs) + Thread.sleep(fetchOffsetRetryIntervalMs) resetConsumer() } } @@ -402,7 +403,7 @@ private[kafka010] class KafkaOffsetReader( throw new InterruptedException() } if (result.isEmpty) { - assert(attempt > maxOffsetFetchAttempts) + assert(attempt > fetchOffsetNumRetries) assert(lastException != null) throw lastException } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index b2950cff98a0d..dd584a5987a07 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String @@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String private[kafka010] class KafkaRelation( override val sqlContext: SQLContext, strategy: ConsumerStrategy, - sourceOptions: Map[String, String], + sourceOptions: CaseInsensitiveMap[String], specifiedKafkaParams: Map[String, String], failOnDataLoss: Boolean, startingOffsets: KafkaOffsetRangeLimit, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 98776f195a5e1..372bcab1cab30 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -78,32 +78,32 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { - validateStreamOptions(parameters) + val caseInsensitiveParameters = CaseInsensitiveMap(parameters) + validateStreamOptions(caseInsensitiveParameters) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) + val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveParameters, metadataPath) - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedKafkaParams = convertToSpecifiedParams(parameters) - val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParameters, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), + strategy(caseInsensitiveParameters), kafkaParamsForDriver(specifiedKafkaParams), - parameters, + caseInsensitiveParameters, driverGroupIdPrefix = s"$uniqueGroupId-driver") new KafkaSource( sqlContext, kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - parameters, + caseInsensitiveParameters, metadataPath, startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + failOnDataLoss(caseInsensitiveParameters)) } override def getTable(options: CaseInsensitiveStringMap): KafkaTable = { @@ -119,24 +119,24 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - validateBatchOptions(parameters) - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } + val caseInsensitiveParameters = CaseInsensitiveMap(parameters) + validateBatchOptions(caseInsensitiveParameters) val specifiedKafkaParams = convertToSpecifiedParams(parameters) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( - caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) + caseInsensitiveParameters, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) assert(startingRelationOffsets != LatestOffsetRangeLimit) - val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( + caseInsensitiveParameters, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) assert(endingRelationOffsets != EarliestOffsetRangeLimit) new KafkaRelation( sqlContext, - strategy(caseInsensitiveParams), - sourceOptions = parameters, + strategy(caseInsensitiveParameters), + sourceOptions = caseInsensitiveParameters, specifiedKafkaParams = specifiedKafkaParams, - failOnDataLoss = failOnDataLoss(caseInsensitiveParams), + failOnDataLoss = failOnDataLoss(caseInsensitiveParameters), startingOffsets = startingRelationOffsets, endingOffsets = endingRelationOffsets) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index 2fcf37a184684..789e9c8a8b276 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -30,52 +30,100 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { + private val expected = "666" private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs) private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger) + private val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader) + private val fetchOffsetNumRetriesMethod = PrivateMethod[Int]('fetchOffsetNumRetries) + private val fetchOffsetRetryIntervalMsMethod = PrivateMethod[Long]('fetchOffsetRetryIntervalMs) override protected def afterEach(): Unit = { SparkEnv.set(null) super.afterEach() } + test("batch mode - options should be handled as case-insensitive") { + verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, batch => { + assert(expected.toLong === getField(batch, pollTimeoutMsMethod)) + }) + } + test("micro-batch mode - options should be handled as case-insensitive") { - def verifyFieldsInMicroBatchStream( - options: CaseInsensitiveStringMap, - expectedPollTimeoutMs: Long, - expectedMaxOffsetsPerTrigger: Option[Long]): Unit = { - // KafkaMicroBatchStream reads Spark conf from SparkEnv for default value - // hence we set mock SparkEnv here before creating KafkaMicroBatchStream - val sparkEnv = mock(classOf[SparkEnv]) - when(sparkEnv.conf).thenReturn(new SparkConf()) - SparkEnv.set(sparkEnv) + verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { + assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) + }) + verifyFieldsInMicroBatchStream(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER, expected, stream => { + assert(Some(expected.toLong) === getField(stream, maxOffsetsPerTriggerMethod)) + }) + } - val scan = getKafkaDataSourceScan(options) - val stream = scan.toMicroBatchStream("dummy").asInstanceOf[KafkaMicroBatchStream] + test("SPARK-28163 - micro-batch mode - options should be handled as case-insensitive") { + verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { + val kafkaOffsetReader = getField(stream, offsetReaderMethod) + assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod)) + }) + verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, + stream => { + val kafkaOffsetReader = getField(stream, offsetReaderMethod) + assert(expected.toLong === getField(kafkaOffsetReader, fetchOffsetRetryIntervalMsMethod)) + }) + } + + test("SPARK-28142 - continuous mode - options should be handled as case-insensitive") { + verifyFieldsInContinuousStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { + assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) + }) + } - assert(expectedPollTimeoutMs === getField(stream, pollTimeoutMsMethod)) - assert(expectedMaxOffsetsPerTrigger === getField(stream, maxOffsetsPerTriggerMethod)) + test("SPARK-28163 - continuous mode - options should be handled as case-insensitive") { + verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { + val kafkaOffsetReader = getField(stream, offsetReaderMethod) + assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod)) + }) + verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, + stream => { + val kafkaOffsetReader = getField(stream, offsetReaderMethod) + assert(expected.toLong === getField(kafkaOffsetReader, fetchOffsetRetryIntervalMsMethod)) + }) + } + + private def verifyFieldsInBatch( + key: String, + value: String, + validate: (KafkaBatch) => Unit): Unit = { + buildCaseInsensitiveStringMapForUpperAndLowerKey(key -> value).foreach { options => + val scan = getKafkaDataSourceScan(options) + val batch = scan.toBatch().asInstanceOf[KafkaBatch] + validate(batch) } + } - val expectedValue = 1000L - buildCaseInsensitiveStringMapForUpperAndLowerKey( - KafkaSourceProvider.CONSUMER_POLL_TIMEOUT -> expectedValue.toString, - KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER -> expectedValue.toString) - .foreach(verifyFieldsInMicroBatchStream(_, expectedValue, Some(expectedValue))) + private def verifyFieldsInMicroBatchStream( + key: String, + value: String, + validate: (KafkaMicroBatchStream) => Unit): Unit = { + // KafkaMicroBatchStream reads Spark conf from SparkEnv for default value + // hence we set mock SparkEnv here before creating KafkaMicroBatchStream + val sparkEnv = mock(classOf[SparkEnv]) + when(sparkEnv.conf).thenReturn(new SparkConf()) + SparkEnv.set(sparkEnv) + + buildCaseInsensitiveStringMapForUpperAndLowerKey(key -> value).foreach { options => + val scan = getKafkaDataSourceScan(options) + val stream = scan.toMicroBatchStream("dummy").asInstanceOf[KafkaMicroBatchStream] + validate(stream) + } } - test("SPARK-28142 - continuous mode - options should be handled as case-insensitive") { - def verifyFieldsInContinuousStream( - options: CaseInsensitiveStringMap, - expectedPollTimeoutMs: Long): Unit = { + private def verifyFieldsInContinuousStream( + key: String, + value: String, + validate: (KafkaContinuousStream) => Unit): Unit = { + buildCaseInsensitiveStringMapForUpperAndLowerKey(key -> value).foreach { options => val scan = getKafkaDataSourceScan(options) val stream = scan.toContinuousStream("dummy").asInstanceOf[KafkaContinuousStream] - assert(expectedPollTimeoutMs === getField(stream, pollTimeoutMsMethod)) + validate(stream) } - - val expectedValue = 1000 - buildCaseInsensitiveStringMapForUpperAndLowerKey( - KafkaSourceProvider.CONSUMER_POLL_TIMEOUT -> expectedValue.toString) - .foreach(verifyFieldsInContinuousStream(_, expectedValue)) } private def buildCaseInsensitiveStringMapForUpperAndLowerKey( From d20b85ac6297f7d79b514a2c4b617f287a00e496 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 5 Jul 2019 10:36:59 +0200 Subject: [PATCH 3/9] Change expected --- .../apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index 789e9c8a8b276..4bee10b651fdb 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { - private val expected = "666" + private val expected = "1111" private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs) private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger) private val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader) From 37239307320538667eb54d4ce074c52cffa16eb5 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 8 Jul 2019 13:55:30 +0200 Subject: [PATCH 4/9] Test cleanup :) --- .../spark/sql/kafka010/KafkaSourceProviderSuite.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index 4bee10b651fdb..910d29d1dc5a0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -55,9 +55,6 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { verifyFieldsInMicroBatchStream(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER, expected, stream => { assert(Some(expected.toLong) === getField(stream, maxOffsetsPerTriggerMethod)) }) - } - - test("SPARK-28163 - micro-batch mode - options should be handled as case-insensitive") { verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { val kafkaOffsetReader = getField(stream, offsetReaderMethod) assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod)) @@ -69,13 +66,10 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { }) } - test("SPARK-28142 - continuous mode - options should be handled as case-insensitive") { + test("continuous mode - options should be handled as case-insensitive") { verifyFieldsInContinuousStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) }) - } - - test("SPARK-28163 - continuous mode - options should be handled as case-insensitive") { verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { val kafkaOffsetReader = getField(stream, offsetReaderMethod) assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod)) From dcc9d56c12a5d2a11d848e98f5017ef918475eb4 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 9 Jul 2019 14:07:49 +0200 Subject: [PATCH 5/9] Rollback KafkaOffsetReader reanmes --- .../spark/sql/kafka010/KafkaOffsetReader.scala | 16 ++++++++-------- .../sql/kafka010/KafkaSourceProviderSuite.scala | 15 ++++++++------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 13fce488c71ef..5c90cfad5e6b4 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -89,10 +89,10 @@ private[kafka010] class KafkaOffsetReader( _consumer } - private val fetchOffsetNumRetries = + private val maxOffsetFetchAttempts = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt - private val fetchOffsetRetryIntervalMs = + private val offsetFetchAttemptIntervalMs = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong private def nextGroupId(): String = { @@ -294,12 +294,12 @@ private[kafka010] class KafkaOffsetReader( if (incorrectOffsets.nonEmpty) { logWarning("Found incorrect offsets in some partitions " + s"(partition, previous offset, fetched offset): $incorrectOffsets") - if (attempt < fetchOffsetNumRetries) { + if (attempt < maxOffsetFetchAttempts) { logWarning("Retrying to fetch latest offsets because of incorrect offsets") - Thread.sleep(fetchOffsetRetryIntervalMs) + Thread.sleep(offsetFetchAttemptIntervalMs) } } - } while (incorrectOffsets.nonEmpty && attempt < fetchOffsetNumRetries) + } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) logDebug(s"Got latest offsets for partition : $partitionOffsets") partitionOffsets @@ -372,7 +372,7 @@ private[kafka010] class KafkaOffsetReader( var result: Option[Map[TopicPartition, Long]] = None var attempt = 1 var lastException: Throwable = null - while (result.isEmpty && attempt <= fetchOffsetNumRetries + while (result.isEmpty && attempt <= maxOffsetFetchAttempts && !Thread.currentThread().isInterrupted) { Thread.currentThread match { case ut: UninterruptibleThread => @@ -390,7 +390,7 @@ private[kafka010] class KafkaOffsetReader( lastException = e logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e) attempt += 1 - Thread.sleep(fetchOffsetRetryIntervalMs) + Thread.sleep(offsetFetchAttemptIntervalMs) resetConsumer() } } @@ -403,7 +403,7 @@ private[kafka010] class KafkaOffsetReader( throw new InterruptedException() } if (result.isEmpty) { - assert(attempt > fetchOffsetNumRetries) + assert(attempt > maxOffsetFetchAttempts) assert(lastException != null) throw lastException } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index 910d29d1dc5a0..7c304ab0096c9 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.collection.JavaConverters._ import org.mockito.Mockito.{mock, when} -import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} +import org.scalatest.{PrivateMethodTester} import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} import org.apache.spark.sql.sources.v2.reader.Scan @@ -34,8 +34,9 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs) private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger) private val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader) - private val fetchOffsetNumRetriesMethod = PrivateMethod[Int]('fetchOffsetNumRetries) - private val fetchOffsetRetryIntervalMsMethod = PrivateMethod[Long]('fetchOffsetRetryIntervalMs) + private val maxOffsetFetchAttemptsMethod = PrivateMethod[Int]('maxOffsetFetchAttempts) + private val offsetFetchAttemptIntervalMsMethod = + PrivateMethod[Long]('offsetFetchAttemptIntervalMs) override protected def afterEach(): Unit = { SparkEnv.set(null) @@ -57,12 +58,12 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod)) + assert(expected.toInt === getField(kafkaOffsetReader, maxOffsetFetchAttemptsMethod)) }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toLong === getField(kafkaOffsetReader, fetchOffsetRetryIntervalMsMethod)) + assert(expected.toLong === getField(kafkaOffsetReader, offsetFetchAttemptIntervalMsMethod)) }) } @@ -72,12 +73,12 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { }) verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod)) + assert(expected.toInt === getField(kafkaOffsetReader, maxOffsetFetchAttemptsMethod)) }) verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toLong === getField(kafkaOffsetReader, fetchOffsetRetryIntervalMsMethod)) + assert(expected.toLong === getField(kafkaOffsetReader, offsetFetchAttemptIntervalMsMethod)) }) } From 5cd7b8e559d4a45c0ace8267b0ba87bb9f5303ad Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 9 Jul 2019 14:12:08 +0200 Subject: [PATCH 6/9] Rollback KafkaMicroBatchStream reanmes --- .../sql/kafka010/KafkaMicroBatchStream.scala | 20 +++++++++---------- .../kafka010/KafkaSourceProviderSuite.scala | 5 ++++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index f3c877ce7b566..a3644d260b0f1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.UninterruptibleThread * and not use wrong broker addresses. */ private[kafka010] class KafkaMicroBatchStream( - private val offsetReader: KafkaOffsetReader, + private val kafkaOffsetReader: KafkaOffsetReader, executorKafkaParams: ju.Map[String, Object], options: CaseInsensitiveStringMap, metadataPath: String, @@ -85,7 +85,7 @@ private[kafka010] class KafkaMicroBatchStream( override def latestOffset(start: Offset): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets - val latestPartitionOffsets = offsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets => rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) }.getOrElse { @@ -100,7 +100,7 @@ private[kafka010] class KafkaMicroBatchStream( // Find the new partitions, and get their earliest offsets val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) - val newPartitionInitialOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) + val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) if (newPartitionInitialOffsets.keySet != newPartitions) { // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) @@ -117,7 +117,7 @@ private[kafka010] class KafkaMicroBatchStream( val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { val message = - if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" } else { s"$deletedPartitions are gone. Some data may have been missed." @@ -172,10 +172,10 @@ private[kafka010] class KafkaMicroBatchStream( override def commit(end: Offset): Unit = {} override def stop(): Unit = { - offsetReader.close() + kafkaOffsetReader.close() } - override def toString(): String = s"KafkaV2[$offsetReader]" + override def toString(): String = s"KafkaV2[$kafkaOffsetReader]" /** * Read initial partition offsets from the checkpoint, or decide the offsets and write them to @@ -195,11 +195,11 @@ private[kafka010] class KafkaMicroBatchStream( metadataLog.get(0).getOrElse { val offsets = startingOffsets match { case EarliestOffsetRangeLimit => - KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) + KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => - KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) + KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => - offsetReader.fetchSpecificOffsets(p, reportDataLoss) + kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") @@ -212,7 +212,7 @@ private[kafka010] class KafkaMicroBatchStream( limit: Long, from: PartitionOffsetMap, until: PartitionOffsetMap): PartitionOffsetMap = { - val fromNew = offsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) val sizes = until.flatMap { case (tp, end) => // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index 7c304ab0096c9..681a2b82a9bff 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -33,7 +33,6 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { private val expected = "1111" private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs) private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger) - private val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader) private val maxOffsetFetchAttemptsMethod = PrivateMethod[Int]('maxOffsetFetchAttempts) private val offsetFetchAttemptIntervalMsMethod = PrivateMethod[Long]('offsetFetchAttemptIntervalMs) @@ -50,6 +49,8 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { } test("micro-batch mode - options should be handled as case-insensitive") { + val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('kafkaOffsetReader) + verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) }) @@ -68,6 +69,8 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { } test("continuous mode - options should be handled as case-insensitive") { + val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader) + verifyFieldsInContinuousStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) }) From 06fe0fc50c4fdf70a1ab2340c6c83aae45faad3c Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 18 Jul 2019 15:30:38 +0200 Subject: [PATCH 7/9] Making offsetReader private[kafka010] --- .../sql/kafka010/KafkaContinuousStream.scala | 2 +- .../sql/kafka010/KafkaMicroBatchStream.scala | 2 +- .../sql/kafka010/KafkaSourceProviderSuite.scala | 17 +++++------------ 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 9489b7f5e9ecb..b0678a5b40758 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * properly read. */ class KafkaContinuousStream( - private val offsetReader: KafkaOffsetReader, + private[kafka010] val offsetReader: KafkaOffsetReader, kafkaParams: ju.Map[String, Object], options: CaseInsensitiveStringMap, metadataPath: String, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index a3644d260b0f1..6229537d34cda 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.UninterruptibleThread * and not use wrong broker addresses. */ private[kafka010] class KafkaMicroBatchStream( - private val kafkaOffsetReader: KafkaOffsetReader, + private[kafka010] val kafkaOffsetReader: KafkaOffsetReader, executorKafkaParams: ju.Map[String, Object], options: CaseInsensitiveStringMap, metadataPath: String, diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index 681a2b82a9bff..af1aff252a2e4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -49,8 +49,6 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { } test("micro-batch mode - options should be handled as case-insensitive") { - val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('kafkaOffsetReader) - verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) }) @@ -58,30 +56,25 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { assert(Some(expected.toLong) === getField(stream, maxOffsetsPerTriggerMethod)) }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { - val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toInt === getField(kafkaOffsetReader, maxOffsetFetchAttemptsMethod)) + assert(expected.toInt === getField(stream.kafkaOffsetReader, maxOffsetFetchAttemptsMethod)) }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { - val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toLong === getField(kafkaOffsetReader, offsetFetchAttemptIntervalMsMethod)) + assert(expected.toLong === getField(stream.kafkaOffsetReader, + offsetFetchAttemptIntervalMsMethod)) }) } test("continuous mode - options should be handled as case-insensitive") { - val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader) - verifyFieldsInContinuousStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) }) verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { - val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toInt === getField(kafkaOffsetReader, maxOffsetFetchAttemptsMethod)) + assert(expected.toInt === getField(stream.offsetReader, maxOffsetFetchAttemptsMethod)) }) verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { - val kafkaOffsetReader = getField(stream, offsetReaderMethod) - assert(expected.toLong === getField(kafkaOffsetReader, offsetFetchAttemptIntervalMsMethod)) + assert(expected.toLong === getField(stream.offsetReader, offsetFetchAttemptIntervalMsMethod)) }) } From 903489494f509659ead6eb1f9bb41ebc8351cab2 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 6 Aug 2019 13:52:45 +0200 Subject: [PATCH 8/9] Remove private method testing approach --- .../spark/sql/kafka010/KafkaBatch.scala | 2 +- .../sql/kafka010/KafkaContinuousStream.scala | 2 +- .../sql/kafka010/KafkaMicroBatchStream.scala | 6 ++-- .../sql/kafka010/KafkaOffsetReader.scala | 4 +-- .../kafka010/KafkaSourceProviderSuite.scala | 29 ++++++------------- 5 files changed, 16 insertions(+), 27 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala index f89be1410ff6a..839a64ed31322 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala @@ -39,7 +39,7 @@ private[kafka010] class KafkaBatch( assert(endingOffsets != EarliestOffsetRangeLimit, "Ending offset not allowed to be set to earliest offsets.") - private val pollTimeoutMs = sourceOptions.getOrElse( + private[kafka010] val pollTimeoutMs = sourceOptions.getOrElse( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, (SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index b0678a5b40758..18d740eaa968f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -54,7 +54,7 @@ class KafkaContinuousStream( failOnDataLoss: Boolean) extends ContinuousStream with Logging { - private val pollTimeoutMs = + private[kafka010] val pollTimeoutMs = options.getLong(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, 512) // Initialized when creating reader factories. If this diverges from the partitions at the latest diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 6229537d34cda..57cb0fae41841 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -63,12 +63,12 @@ private[kafka010] class KafkaMicroBatchStream( startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging { - private val pollTimeoutMs = options.getLong( + private[kafka010] val pollTimeoutMs = options.getLong( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L) - private val maxOffsetsPerTrigger = Option(options.get(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)) - .map(_.toLong) + private[kafka010] val maxOffsetsPerTrigger = Option(options.get( + KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)).map(_.toLong) private val rangeCalculator = KafkaOffsetRangeCalculator(options) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 5c90cfad5e6b4..f3effd5300a79 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -89,10 +89,10 @@ private[kafka010] class KafkaOffsetReader( _consumer } - private val maxOffsetFetchAttempts = + private[kafka010] val maxOffsetFetchAttempts = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt - private val offsetFetchAttemptIntervalMs = + private[kafka010] val offsetFetchAttemptIntervalMs = readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong private def nextGroupId(): String = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index af1aff252a2e4..be5af1a0886dd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -22,20 +22,14 @@ import java.util.Locale import scala.collection.JavaConverters._ import org.mockito.Mockito.{mock, when} -import org.scalatest.{PrivateMethodTester} import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} import org.apache.spark.sql.sources.v2.reader.Scan import org.apache.spark.sql.util.CaseInsensitiveStringMap -class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { +class KafkaSourceProviderSuite extends SparkFunSuite { private val expected = "1111" - private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs) - private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger) - private val maxOffsetFetchAttemptsMethod = PrivateMethod[Int]('maxOffsetFetchAttempts) - private val offsetFetchAttemptIntervalMsMethod = - PrivateMethod[Long]('offsetFetchAttemptIntervalMs) override protected def afterEach(): Unit = { SparkEnv.set(null) @@ -44,37 +38,36 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { test("batch mode - options should be handled as case-insensitive") { verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, batch => { - assert(expected.toLong === getField(batch, pollTimeoutMsMethod)) + assert(expected.toLong === batch.pollTimeoutMs) }) } test("micro-batch mode - options should be handled as case-insensitive") { verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { - assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) + assert(expected.toLong === stream.pollTimeoutMs) }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER, expected, stream => { - assert(Some(expected.toLong) === getField(stream, maxOffsetsPerTriggerMethod)) + assert(Some(expected.toLong) === stream.maxOffsetsPerTrigger) }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { - assert(expected.toInt === getField(stream.kafkaOffsetReader, maxOffsetFetchAttemptsMethod)) + assert(expected.toInt === stream.kafkaOffsetReader.maxOffsetFetchAttempts) }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { - assert(expected.toLong === getField(stream.kafkaOffsetReader, - offsetFetchAttemptIntervalMsMethod)) + assert(expected.toLong === stream.kafkaOffsetReader.offsetFetchAttemptIntervalMs)) }) } test("continuous mode - options should be handled as case-insensitive") { verifyFieldsInContinuousStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => { - assert(expected.toLong === getField(stream, pollTimeoutMsMethod)) + assert(expected.toLong === stream.pollTimeoutMs) }) verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => { - assert(expected.toInt === getField(stream.offsetReader, maxOffsetFetchAttemptsMethod)) + assert(expected.toInt === stream.offsetReader.maxOffsetFetchAttempts) }) verifyFieldsInContinuousStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { - assert(expected.toLong === getField(stream.offsetReader, offsetFetchAttemptIntervalMsMethod)) + assert(expected.toLong === stream.offsetReader.offsetFetchAttemptIntervalMs) }) } @@ -134,8 +127,4 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { val provider = new KafkaSourceProvider() provider.getTable(options).newScanBuilder(options).build() } - - private def getField[T](obj: AnyRef, method: PrivateMethod[T]): T = { - obj.invokePrivate(method()) - } } From e0e8a8851f975311be990c5a4889d4c9870c3bea Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 8 Aug 2019 13:49:08 +0200 Subject: [PATCH 9/9] Compile fix --- .../apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala index be5af1a0886dd..8e6de88865e06 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala @@ -54,7 +54,7 @@ class KafkaSourceProviderSuite extends SparkFunSuite { }) verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, expected, stream => { - assert(expected.toLong === stream.kafkaOffsetReader.offsetFetchAttemptIntervalMs)) + assert(expected.toLong === stream.kafkaOffsetReader.offsetFetchAttemptIntervalMs) }) }