Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._
Expand Down Expand Up @@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val retryTimeoutMs: Int = 10000,
val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
val kinesisCreds: SparkAWSCredentials = DefaultCredentials
val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
val kinesisReadConfigs: KinesisReadConfigurations = KinesisReadConfigurations()
) extends BlockRDD[T](sc, _blockIds) {

require(_blockIds.length == arrayOfseqNumberRanges.length,
Expand Down Expand Up @@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
val credentials = kinesisCreds.provider.getCredentials
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
range, retryTimeoutMs).map(messageHandler)
range, kinesisReadConfigs).map(messageHandler)
}
}
if (partition.isBlockIdValid) {
Expand All @@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator(
endpointUrl: String,
regionId: String,
range: SequenceNumberRange,
retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
kinesisReadConfigs: KinesisReadConfigurations) extends NextIterator[Record] with Logging {

private val client = new AmazonKinesisClient(credentials)
private val streamName = range.streamName
Expand Down Expand Up @@ -251,21 +251,19 @@ class KinesisSequenceRangeIterator(

/** Helper method to retry Kinesis API request with exponential backoff and timeouts */
private def retryOrTimeout[T](message: String)(body: => T): T = {
import KinesisSequenceRangeIterator._

var startTimeMs = System.currentTimeMillis()
val startTimeMs = System.currentTimeMillis()
var retryCount = 0
var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
var result: Option[T] = None
var lastError: Throwable = null
var waitTimeInterval = kinesisReadConfigs.retryWaitTimeMs

def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs
def isMaxRetryDone = retryCount >= MAX_RETRIES
def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= kinesisReadConfigs.retryTimeoutMs
def isMaxRetryDone = retryCount >= kinesisReadConfigs.maxRetries

while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
if (retryCount > 0) { // wait only if this is a retry
Thread.sleep(waitTimeMs)
waitTimeMs *= 2 // if you have waited, then double wait time for next round
Thread.sleep(waitTimeInterval)
waitTimeInterval *= 2 // if you have waited, then double wait time for next round
}
try {
result = Some(body)
Expand All @@ -284,17 +282,12 @@ class KinesisSequenceRangeIterator(
result.getOrElse {
if (isTimedOut) {
throw new SparkException(
s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while " +
s"$message, last exception: ", lastError)
} else {
throw new SparkException(
s"Gave up after $retryCount retries while $message, last exception: ", lastError)
}
}
}
}

private[streaming]
object KinesisSequenceRangeIterator {
val MAX_RETRIES = 3
val MIN_RETRY_WAIT_TIME_MS = 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.reflect.ClassTag

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record
import KinesisReadConfigurations._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -60,12 +61,13 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")

new KinesisBackedBlockRDD(
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
messageHandler = messageHandler,
kinesisCreds = kinesisCreds)
kinesisCreds = kinesisCreds,
kinesisReadConfigs = KinesisReadConfigurations(ssc))
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kinesis

import org.apache.spark.network.util.JavaUtils
import org.apache.spark.streaming.StreamingContext

/**
* Configurations to pass to the [[KinesisBackedBlockRDD]].
*
* @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
* @param retryWaitTimeMs: The interval between consequent Kinesis retries.
* Defaults to 100ms.
* @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
* Defaults to batch duration provided for streaming,
* else uses 10000 if invoked directly.
*/
private[kinesis] case class KinesisReadConfigurations(
maxRetries: Int,
retryWaitTimeMs: Long,
retryTimeoutMs: Long)

private[kinesis] object KinesisReadConfigurations {
def apply(): KinesisReadConfigurations = {
KinesisReadConfigurations(maxRetries = DEFAULT_MAX_RETRIES,
retryWaitTimeMs = JavaUtils.timeStringAsMs(DEFAULT_RETRY_WAIT_TIME),
retryTimeoutMs = DEFAULT_RETRY_TIMEOUT)
}

def apply(ssc: StreamingContext): KinesisReadConfigurations = {
KinesisReadConfigurations(
maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES),
retryWaitTimeMs = JavaUtils.timeStringAsMs(
ssc.sc.getConf.get(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)),
retryTimeoutMs = ssc.graph.batchDuration.milliseconds)
}

/**
* SparkConf key for configuring the maximum number of retries used when attempting a Kinesis
* request.
*/
val RETRY_MAX_ATTEMPTS_KEY = "spark.streaming.kinesis.retry.maxAttempts"

/**
* SparkConf key for configuring the wait time to use before retrying a Kinesis attempt.
*/
val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"

/**
* Default value for the RETRY_MAX_ATTEMPTS_KEY
*/
val DEFAULT_MAX_RETRIES = 3

/**
* Default value for the RETRY_WAIT_TIME_KEY
*/
val DEFAULT_RETRY_WAIT_TIME = "100ms"

/**
* Default value for the retry timeout
*/
val DEFAULT_RETRY_TIMEOUT = 10000
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._
import org.apache.spark.streaming.kinesis.KinesisTestUtils._
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
Expand Down Expand Up @@ -136,7 +137,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
assert(kinesisRDD.regionName === dummyRegionName)
assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds)
assert(kinesisRDD.kinesisCreds === BasicCredentials(
awsAccessKeyId = dummyAWSAccessKey,
awsSecretKey = dummyAWSSecretKey))
Expand Down Expand Up @@ -234,6 +235,52 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.stop(stopSparkContext = false)
}

test("Kinesis read with custom configurations") {
try {
ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")

val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName("dummyStream")
.endpointUrl(dummyEndpointUrl)
.regionName(dummyRegionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()
.asInstanceOf[KinesisInputDStream[Array[Byte]]]

val time = Time(1000)
// Generate block info data for testing
val seqNumRanges1 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
val blockId1 = StreamBlockId(kinesisStream.id, 123)
val blockInfo1 = ReceivedBlockInfo(
0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))

val seqNumRanges2 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
val blockId2 = StreamBlockId(kinesisStream.id, 345)
val blockInfo2 = ReceivedBlockInfo(
0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))

// Verify that the generated KinesisBackedBlockRDD has the all the right information
val blockInfos = Seq(blockInfo1, blockInfo2)

val kinesisRDD =
kinesisStream.createBlockRDD(time, blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]

assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds)
} finally {
ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
ssc.sc.conf.remove(RETRY_MAX_ATTEMPTS_KEY)
ssc.stop(stopSparkContext = false)
}
}

testIfEnabled("split and merge shards in a stream") {
// Since this test tries to split and merge shards in a stream, we create another
// temporary stream and then remove it when finished.
Expand Down