diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md
index 55acec53302e..26a83e52de2b 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -64,13 +64,13 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.kinesis.KinesisInputDStream
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.streaming.StreamingContext
- import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+ import org.apache.spark.storage.StorageLevel;
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream;
+ import org.apache.spark.streaming.Seconds;
+ import org.apache.spark.streaming.StreamingContext;
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
- KinesisInputDStream kinesisStream = KinesisInputDStream.builder
+ KinesisInputDStream kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
@@ -98,14 +98,21 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
- You may also provide a "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. This is currently only supported in Scala and Java.
+ You may also provide the following settings. These are currently only supported in Scala and Java.
+
+ - A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key.
+
+ - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details.
+ import collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
@@ -116,17 +123,22 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .metricsLevel(MetricsLevel.DETAILED)
+ .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
.buildWithMessageHandler([message handler])
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.kinesis.KinesisInputDStream
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.streaming.StreamingContext
- import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-
- KinesisInputDStream kinesisStream = KinesisInputDStream.builder
+ import org.apache.spark.storage.StorageLevel;
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream;
+ import org.apache.spark.streaming.Seconds;
+ import org.apache.spark.streaming.StreamingContext;
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
+ import scala.collection.JavaConverters;
+
+ KinesisInputDStream kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
@@ -135,6 +147,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .metricsLevel(MetricsLevel.DETAILED)
+ .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
.buildWithMessageHandler([message handler]);
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
index 608da0b8bf56..8c3931a1c87f 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis
import scala.reflect.ClassTag
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import collection.JavaConverters._
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
+import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.rdd.RDD
@@ -43,7 +45,9 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val messageHandler: Record => T,
val kinesisCreds: SparkAWSCredentials,
val dynamoDBCreds: Option[SparkAWSCredentials],
- val cloudWatchCreds: Option[SparkAWSCredentials]
+ val cloudWatchCreds: Option[SparkAWSCredentials],
+ val metricsLevel: MetricsLevel,
+ val metricsEnabledDimensions: Set[String]
) extends ReceiverInputDStream[T](_ssc) {
import KinesisReadConfigurations._
@@ -79,7 +83,8 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
- kinesisCreds, dynamoDBCreds, cloudWatchCreds)
+ kinesisCreds, dynamoDBCreds, cloudWatchCreds,
+ metricsLevel, metricsEnabledDimensions)
}
}
@@ -104,6 +109,8 @@ object KinesisInputDStream {
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None
private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None
+ private var metricsLevel: Option[MetricsLevel] = None
+ private var metricsEnabledDimensions: Option[Set[String]] = None
/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
@@ -237,6 +244,7 @@ object KinesisInputDStream {
* endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified.
*
* @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication
+ * @return Reference to this [[KinesisInputDStream.Builder]]
*/
def kinesisCredentials(credentials: SparkAWSCredentials): Builder = {
kinesisCredsProvider = Option(credentials)
@@ -248,6 +256,7 @@ object KinesisInputDStream {
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication
+ * @return Reference to this [[KinesisInputDStream.Builder]]
*/
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = {
dynamoDBCredsProvider = Option(credentials)
@@ -259,12 +268,43 @@ object KinesisInputDStream {
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication
+ * @return Reference to this [[KinesisInputDStream.Builder]]
*/
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = {
cloudWatchCredsProvider = Option(credentials)
this
}
+ /**
+ * Sets the CloudWatch metrics level. Defaults to
+ * [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified.
+ *
+ * @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level
+ * @return Reference to this [[KinesisInputDStream.Builder]]
+ * @see
+ * [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
+ */
+ def metricsLevel(metricsLevel: MetricsLevel): Builder = {
+ this.metricsLevel = Option(metricsLevel)
+ this
+ }
+
+ /**
+ * Sets the enabled CloudWatch metrics dimensions. Defaults to
+ * [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]]
+ * if no custom value is specified.
+ *
+ * @param metricsEnabledDimensions Set[String] to specify which CloudWatch metrics dimensions
+ * should be enabled
+ * @return Reference to this [[KinesisInputDStream.Builder]]
+ * @see
+ * [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
+ */
+ def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = {
+ this.metricsEnabledDimensions = Option(metricsEnabledDimensions)
+ this
+ }
+
/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided
* message handler.
@@ -287,7 +327,9 @@ object KinesisInputDStream {
ssc.sc.clean(handler),
kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
- cloudWatchCredsProvider)
+ cloudWatchCredsProvider,
+ metricsLevel.getOrElse(DEFAULT_METRICS_LEVEL),
+ metricsEnabledDimensions.getOrElse(DEFAULT_METRICS_ENABLED_DIMENSIONS))
}
/**
@@ -324,4 +366,8 @@ object KinesisInputDStream {
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
+ private[kinesis] val DEFAULT_METRICS_LEVEL: MetricsLevel =
+ KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
+ private[kinesis] val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] =
+ KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 69c52365b1bf..9ea7d4081928 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker}
+import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.internal.Logging
@@ -92,7 +93,9 @@ private[kinesis] class KinesisReceiver[T](
messageHandler: Record => T,
kinesisCreds: SparkAWSCredentials,
dynamoDBCreds: Option[SparkAWSCredentials],
- cloudWatchCreds: Option[SparkAWSCredentials])
+ cloudWatchCreds: Option[SparkAWSCredentials],
+ metricsLevel: MetricsLevel,
+ metricsEnabledDimensions: Set[String])
extends Receiver[T](storageLevel) with Logging { receiver =>
/*
@@ -162,6 +165,8 @@ private[kinesis] class KinesisReceiver[T](
.withKinesisEndpoint(endpointUrl)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
+ .withMetricsLevel(metricsLevel)
+ .withMetricsEnabledDimensions(metricsEnabledDimensions.asJava)
// Update the Kinesis client lib config with timestamp
// if InitialPositionInStream.AT_TIMESTAMP is passed
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index c60b9896a347..4e796b2caec8 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -75,7 +75,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
- cleanedHandler, DefaultCredentials, None, None)
+ cleanedHandler, DefaultCredentials, None, None,
+ KinesisInputDStream.DEFAULT_METRICS_LEVEL,
+ KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}
@@ -132,7 +134,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
- cleanedHandler, kinesisCredsProvider, None, None)
+ cleanedHandler, kinesisCredsProvider, None, None,
+ KinesisInputDStream.DEFAULT_METRICS_LEVEL,
+ KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}
@@ -202,7 +206,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
- cleanedHandler, kinesisCredsProvider, None, None)
+ cleanedHandler, kinesisCredsProvider, None, None,
+ KinesisInputDStream.DEFAULT_METRICS_LEVEL,
+ KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}
@@ -248,7 +254,9 @@ object KinesisUtils {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
- KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None)
+ KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None,
+ KinesisInputDStream.DEFAULT_METRICS_LEVEL,
+ KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}
@@ -299,7 +307,9 @@ object KinesisUtils {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
- KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None)
+ KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None,
+ KinesisInputDStream.DEFAULT_METRICS_LEVEL,
+ KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
index 361520e29226..25357cb52ede 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis
import java.util.Calendar
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import collection.JavaConverters._
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
+import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar
@@ -82,6 +84,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstream.kinesisCreds == DefaultCredentials)
assert(dstream.dynamoDBCreds == None)
assert(dstream.cloudWatchCreds == None)
+ assert(dstream.metricsLevel == DEFAULT_METRICS_LEVEL)
+ assert(dstream.metricsEnabledDimensions == DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
test("should propagate custom non-auth values to KinesisInputDStream") {
@@ -94,6 +98,9 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
val customKinesisCreds = mock[SparkAWSCredentials]
val customDynamoDBCreds = mock[SparkAWSCredentials]
val customCloudWatchCreds = mock[SparkAWSCredentials]
+ val customMetricsLevel = MetricsLevel.NONE
+ val customMetricsEnabledDimensions =
+ KinesisClientLibConfiguration.METRICS_ALWAYS_ENABLED_DIMENSIONS.asScala.toSet
val dstream = builder
.endpointUrl(customEndpointUrl)
@@ -105,6 +112,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
+ .metricsLevel(customMetricsLevel)
+ .metricsEnabledDimensions(customMetricsEnabledDimensions)
.build()
assert(dstream.endpointUrl == customEndpointUrl)
assert(dstream.regionName == customRegion)
@@ -115,6 +124,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstream.kinesisCreds == customKinesisCreds)
assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
+ assert(dstream.metricsLevel == customMetricsLevel)
+ assert(dstream.metricsEnabledDimensions == customMetricsEnabledDimensions)
// Testing with AtTimestamp
val cal = Calendar.getInstance()
@@ -132,6 +143,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
+ .metricsLevel(customMetricsLevel)
+ .metricsEnabledDimensions(customMetricsEnabledDimensions)
.build()
assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
assert(dstreamAtTimestamp.regionName == customRegion)
@@ -145,6 +158,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds))
+ assert(dstreamAtTimestamp.metricsLevel == customMetricsLevel)
+ assert(dstreamAtTimestamp.metricsEnabledDimensions == customMetricsEnabledDimensions)
}
test("old Api should throw UnsupportedOperationExceptionexception with AT_TIMESTAMP") {