From c4ada94fc389fef794392f432f267d281bb8720d Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Mon, 20 May 2019 16:44:02 +0900 Subject: [PATCH 1/4] [SPARK-27420][DSTREAMS][Kinesis] KinesisInputDStream should expose a way to configure CloudWatch metrics KinesisInputDStream currently does not provide a way to disable CloudWatch metrics push. Its default level is "DETAILED" which pushes 10s of metrics every 10 seconds. When dealing with multiple streaming jobs this add up pretty quickly, leading to thousands of dollars in cost. To address this problem, this PR adds interfaces for accessing KinesisClientLibConfiguration's `withMetrics` and `withMetricsEnabledDimensions` methods to KinesisInputDStream so that users can configure KCL's metrics levels and dimensions. --- .../kinesis/KinesisInputDStream.scala | 52 +++++++++++++++++-- .../streaming/kinesis/KinesisReceiver.scala | 7 ++- .../streaming/kinesis/KinesisUtils.scala | 20 +++++-- .../KinesisInputDStreamBuilderSuite.scala | 17 +++++- 4 files changed, 85 insertions(+), 11 deletions(-) diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index d4a428f45c11..b7549e990bb5 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis import scala.reflect.ClassTag -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import collection.JavaConverters._ +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration} +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import com.amazonaws.services.kinesis.model.Record import org.apache.spark.annotation.Evolving @@ -44,7 +46,9 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( val messageHandler: Record => T, val kinesisCreds: SparkAWSCredentials, val dynamoDBCreds: Option[SparkAWSCredentials], - val cloudWatchCreds: Option[SparkAWSCredentials] + val cloudWatchCreds: Option[SparkAWSCredentials], + val metricsLevel: MetricsLevel, + val metricsEnabledDimensions: Set[String] ) extends ReceiverInputDStream[T](_ssc) { import KinesisReadConfigurations._ @@ -80,7 +84,8 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( override def getReceiver(): Receiver[T] = { new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition, checkpointAppName, checkpointInterval, _storageLevel, messageHandler, - kinesisCreds, dynamoDBCreds, cloudWatchCreds) + kinesisCreds, dynamoDBCreds, cloudWatchCreds, + metricsLevel, metricsEnabledDimensions) } } @@ -107,6 +112,8 @@ object KinesisInputDStream { private var kinesisCredsProvider: Option[SparkAWSCredentials] = None private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None + private var metricsLevel: Option[MetricsLevel] = None + private var metricsEnabledDimensions: Option[Set[String]] = None /** * Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a @@ -240,6 +247,7 @@ object KinesisInputDStream { * endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified. * * @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication + * @return Reference to this [[KinesisInputDStream.Builder]] */ def kinesisCredentials(credentials: SparkAWSCredentials): Builder = { kinesisCredsProvider = Option(credentials) @@ -251,6 +259,7 @@ object KinesisInputDStream { * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set. * * @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication + * @return Reference to this [[KinesisInputDStream.Builder]] */ def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = { dynamoDBCredsProvider = Option(credentials) @@ -262,12 +271,41 @@ object KinesisInputDStream { * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set. * * @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication + * @return Reference to this [[KinesisInputDStream.Builder]] */ def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = { cloudWatchCredsProvider = Option(credentials) this } + /** + * Sets the CloudWatch metrics level. Defaults to + * [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified. + * + * @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level + * @return Reference to this [[KinesisInputDStream.Builder]] + * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] + */ + def metricsLevel(metricsLevel: MetricsLevel): Builder = { + this.metricsLevel = Option(metricsLevel) + this + } + + /** + * Sets the enabled CloudWatch metrics dimensions. Defaults to + * [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]] + * if no custom value is specified. + * + * @param metricsEnabledDimensions [[Set[String]]] to specify + * the enabled CloudWatch metrics dimensions + * @return Reference to this [[KinesisInputDStream.Builder]] + * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] + */ + def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = { + this.metricsEnabledDimensions = Option(metricsEnabledDimensions) + this + } + /** * Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided * message handler. @@ -290,7 +328,9 @@ object KinesisInputDStream { ssc.sc.clean(handler), kinesisCredsProvider.getOrElse(DefaultCredentials), dynamoDBCredsProvider, - cloudWatchCredsProvider) + cloudWatchCredsProvider, + metricsLevel.getOrElse(DEFAULT_METRICS_LEVEL), + metricsEnabledDimensions.getOrElse(DEFAULT_METRICS_ENABLED_DIMENSIONS)) } /** @@ -327,4 +367,8 @@ object KinesisInputDStream { private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1" private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest() private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2 + private[kinesis] val DEFAULT_METRICS_LEVEL: MetricsLevel = + KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL + private[kinesis] val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] = + KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet } diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 69c52365b1bf..9ea7d4081928 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker} +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import com.amazonaws.services.kinesis.model.Record import org.apache.spark.internal.Logging @@ -92,7 +93,9 @@ private[kinesis] class KinesisReceiver[T]( messageHandler: Record => T, kinesisCreds: SparkAWSCredentials, dynamoDBCreds: Option[SparkAWSCredentials], - cloudWatchCreds: Option[SparkAWSCredentials]) + cloudWatchCreds: Option[SparkAWSCredentials], + metricsLevel: MetricsLevel, + metricsEnabledDimensions: Set[String]) extends Receiver[T](storageLevel) with Logging { receiver => /* @@ -162,6 +165,8 @@ private[kinesis] class KinesisReceiver[T]( .withKinesisEndpoint(endpointUrl) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + .withMetricsLevel(metricsLevel) + .withMetricsEnabledDimensions(metricsEnabledDimensions.asJava) // Update the Kinesis client lib config with timestamp // if InitialPositionInStream.AT_TIMESTAMP is passed diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index c60b9896a347..4e796b2caec8 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -75,7 +75,9 @@ object KinesisUtils { new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, DefaultCredentials, None, None) + cleanedHandler, DefaultCredentials, None, None, + KinesisInputDStream.DEFAULT_METRICS_LEVEL, + KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS) } } @@ -132,7 +134,9 @@ object KinesisUtils { new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, kinesisCredsProvider, None, None) + cleanedHandler, kinesisCredsProvider, None, None, + KinesisInputDStream.DEFAULT_METRICS_LEVEL, + KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS) } } @@ -202,7 +206,9 @@ object KinesisUtils { new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, kinesisCredsProvider, None, None) + cleanedHandler, kinesisCredsProvider, None, None, + KinesisInputDStream.DEFAULT_METRICS_LEVEL, + KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS) } } @@ -248,7 +254,9 @@ object KinesisUtils { new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), kinesisAppName, checkpointInterval, storageLevel, - KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None) + KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None, + KinesisInputDStream.DEFAULT_METRICS_LEVEL, + KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS) } } @@ -299,7 +307,9 @@ object KinesisUtils { new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream), kinesisAppName, checkpointInterval, storageLevel, - KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None) + KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None, + KinesisInputDStream.DEFAULT_METRICS_LEVEL, + KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS) } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala index 361520e29226..25357cb52ede 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis import java.util.Calendar -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import collection.JavaConverters._ +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration} +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import org.scalatest.BeforeAndAfterEach import org.scalatest.mockito.MockitoSugar @@ -82,6 +84,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstream.kinesisCreds == DefaultCredentials) assert(dstream.dynamoDBCreds == None) assert(dstream.cloudWatchCreds == None) + assert(dstream.metricsLevel == DEFAULT_METRICS_LEVEL) + assert(dstream.metricsEnabledDimensions == DEFAULT_METRICS_ENABLED_DIMENSIONS) } test("should propagate custom non-auth values to KinesisInputDStream") { @@ -94,6 +98,9 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE val customKinesisCreds = mock[SparkAWSCredentials] val customDynamoDBCreds = mock[SparkAWSCredentials] val customCloudWatchCreds = mock[SparkAWSCredentials] + val customMetricsLevel = MetricsLevel.NONE + val customMetricsEnabledDimensions = + KinesisClientLibConfiguration.METRICS_ALWAYS_ENABLED_DIMENSIONS.asScala.toSet val dstream = builder .endpointUrl(customEndpointUrl) @@ -105,6 +112,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .kinesisCredentials(customKinesisCreds) .dynamoDBCredentials(customDynamoDBCreds) .cloudWatchCredentials(customCloudWatchCreds) + .metricsLevel(customMetricsLevel) + .metricsEnabledDimensions(customMetricsEnabledDimensions) .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) @@ -115,6 +124,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + assert(dstream.metricsLevel == customMetricsLevel) + assert(dstream.metricsEnabledDimensions == customMetricsEnabledDimensions) // Testing with AtTimestamp val cal = Calendar.getInstance() @@ -132,6 +143,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .kinesisCredentials(customKinesisCreds) .dynamoDBCredentials(customDynamoDBCreds) .cloudWatchCredentials(customCloudWatchCreds) + .metricsLevel(customMetricsLevel) + .metricsEnabledDimensions(customMetricsEnabledDimensions) .build() assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl) assert(dstreamAtTimestamp.regionName == customRegion) @@ -145,6 +158,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds) assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds)) + assert(dstreamAtTimestamp.metricsLevel == customMetricsLevel) + assert(dstreamAtTimestamp.metricsEnabledDimensions == customMetricsEnabledDimensions) } test("old Api should throw UnsupportedOperationExceptionexception with AT_TIMESTAMP") { From a7800e69e98333066447d58739338b34541d6da2 Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Mon, 19 Aug 2019 16:57:47 +0900 Subject: [PATCH 2/4] Address scalastyle errors --- .../kinesis/KinesisInputDStream.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index b7549e990bb5..1cd7fc914556 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -278,29 +278,33 @@ object KinesisInputDStream { this } + // scalastyle:off line.size.limit /** - * Sets the CloudWatch metrics level. Defaults to - * [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified. - * - * @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level - * @return Reference to this [[KinesisInputDStream.Builder]] - * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] - */ + * Sets the CloudWatch metrics level. Defaults to + * [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified. + * + * @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level + * @return Reference to this [[KinesisInputDStream.Builder]] + * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] + */ + // scalastyle:on line.size.limit def metricsLevel(metricsLevel: MetricsLevel): Builder = { this.metricsLevel = Option(metricsLevel) this } + // scalastyle:off line.size.limit /** - * Sets the enabled CloudWatch metrics dimensions. Defaults to - * [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]] - * if no custom value is specified. - * - * @param metricsEnabledDimensions [[Set[String]]] to specify - * the enabled CloudWatch metrics dimensions - * @return Reference to this [[KinesisInputDStream.Builder]] - * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] - */ + * Sets the enabled CloudWatch metrics dimensions. Defaults to + * [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]] + * if no custom value is specified. + * + * @param metricsEnabledDimensions [[Set[String]]] to specify + * the enabled CloudWatch metrics dimensions + * @return Reference to this [[KinesisInputDStream.Builder]] + * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] + */ + // scalastyle:off line.size.limit def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = { this.metricsEnabledDimensions = Option(metricsEnabledDimensions) this From fd2229c76fce9a37448546cf42cfec937a349200 Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Fri, 23 Aug 2019 19:18:54 +0900 Subject: [PATCH 3/4] Fix a wrong scalastyle directive --- .../apache/spark/streaming/kinesis/KinesisInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index 1cd7fc914556..61f927d01dbe 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -304,7 +304,7 @@ object KinesisInputDStream { * @return Reference to this [[KinesisInputDStream.Builder]] * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] */ - // scalastyle:off line.size.limit + // scalastyle:on line.size.limit def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = { this.metricsEnabledDimensions = Option(metricsEnabledDimensions) this From c9341a0b2e92a658593e47d39b02eb1026d87235 Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Sat, 7 Sep 2019 22:22:35 +0900 Subject: [PATCH 4/4] [SPARK-27420] Additional documentation fixes * Kinesis integration documentation * Add explanation and usage examples for the new API * Fix the existing Java grammatical mistakes * Scaladoc * Insert newlines between the @see directives and URLs to avoid disabling scalastyle * Remove unnecessary brackets around a standard class * Use two spaces for a continuation indent --- docs/streaming-kinesis-integration.md | 42 ++++++++++++------- .../kinesis/KinesisInputDStream.scala | 14 +++---- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 55acec53302e..26a83e52de2b 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -64,13 +64,13 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
- import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming.kinesis.KinesisInputDStream - import org.apache.spark.streaming.Seconds - import org.apache.spark.streaming.StreamingContext - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + import org.apache.spark.storage.StorageLevel; + import org.apache.spark.streaming.kinesis.KinesisInputDStream; + import org.apache.spark.streaming.Seconds; + import org.apache.spark.streaming.StreamingContext; + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - KinesisInputDStream kinesisStream = KinesisInputDStream.builder + KinesisInputDStream kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) @@ -98,14 +98,21 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
- You may also provide a "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. This is currently only supported in Scala and Java. + You may also provide the following settings. These are currently only supported in Scala and Java. + + - A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. + + - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details.
+ import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration + import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) @@ -116,17 +123,22 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .metricsLevel(MetricsLevel.DETAILED) + .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
- import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming.kinesis.KinesisInputDStream - import org.apache.spark.streaming.Seconds - import org.apache.spark.streaming.StreamingContext - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream - - KinesisInputDStream kinesisStream = KinesisInputDStream.builder + import org.apache.spark.storage.StorageLevel; + import org.apache.spark.streaming.kinesis.KinesisInputDStream; + import org.apache.spark.streaming.Seconds; + import org.apache.spark.streaming.StreamingContext; + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; + import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; + import scala.collection.JavaConverters; + + KinesisInputDStream kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) @@ -135,6 +147,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .metricsLevel(MetricsLevel.DETAILED) + .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([message handler]);
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index 3eacb198da86..8c3931a1c87f 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -275,33 +275,31 @@ object KinesisInputDStream { this } - // scalastyle:off line.size.limit /** * Sets the CloudWatch metrics level. Defaults to * [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified. * * @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level * @return Reference to this [[KinesisInputDStream.Builder]] - * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] + * @see + * [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] */ - // scalastyle:on line.size.limit def metricsLevel(metricsLevel: MetricsLevel): Builder = { this.metricsLevel = Option(metricsLevel) this } - // scalastyle:off line.size.limit /** * Sets the enabled CloudWatch metrics dimensions. Defaults to * [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]] * if no custom value is specified. * - * @param metricsEnabledDimensions [[Set[String]]] to specify - * the enabled CloudWatch metrics dimensions + * @param metricsEnabledDimensions Set[String] to specify which CloudWatch metrics dimensions + * should be enabled * @return Reference to this [[KinesisInputDStream.Builder]] - * @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] + * @see + * [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]] */ - // scalastyle:on line.size.limit def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = { this.metricsEnabledDimensions = Option(metricsEnabledDimensions) this