Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis

import scala.reflect.ClassTag

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import collection.JavaConverters._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.rdd.RDD
Expand All @@ -43,7 +45,9 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val messageHandler: Record => T,
val kinesisCreds: SparkAWSCredentials,
val dynamoDBCreds: Option[SparkAWSCredentials],
val cloudWatchCreds: Option[SparkAWSCredentials]
val cloudWatchCreds: Option[SparkAWSCredentials],
val metricsLevel: MetricsLevel,
val metricsEnabledDimensions: Set[String]
) extends ReceiverInputDStream[T](_ssc) {

import KinesisReadConfigurations._
Expand Down Expand Up @@ -79,7 +83,8 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
kinesisCreds, dynamoDBCreds, cloudWatchCreds)
kinesisCreds, dynamoDBCreds, cloudWatchCreds,
metricsLevel, metricsEnabledDimensions)
}
}

Expand All @@ -104,6 +109,8 @@ object KinesisInputDStream {
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None
private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None
private var metricsLevel: Option[MetricsLevel] = None
private var metricsEnabledDimensions: Option[Set[String]] = None

/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
Expand Down Expand Up @@ -237,6 +244,7 @@ object KinesisInputDStream {
* endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified.
*
* @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def kinesisCredentials(credentials: SparkAWSCredentials): Builder = {
kinesisCredsProvider = Option(credentials)
Expand All @@ -248,6 +256,7 @@ object KinesisInputDStream {
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = {
dynamoDBCredsProvider = Option(credentials)
Expand All @@ -259,12 +268,45 @@ object KinesisInputDStream {
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = {
cloudWatchCredsProvider = Option(credentials)
this
}

// scalastyle:off line.size.limit
/**
* Sets the CloudWatch metrics level. Defaults to
* [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified.
*
* @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level
* @return Reference to this [[KinesisInputDStream.Builder]]
* @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
*/
// scalastyle:on line.size.limit
def metricsLevel(metricsLevel: MetricsLevel): Builder = {
this.metricsLevel = Option(metricsLevel)
this
}

// scalastyle:off line.size.limit
/**
* Sets the enabled CloudWatch metrics dimensions. Defaults to
* [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]]
* if no custom value is specified.
*
* @param metricsEnabledDimensions [[Set[String]]] to specify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a small thing, but I know we have had problems generating scaladoc with references to library code. It might be worth building the doc HTML (only) as in https://github.com/apache/spark/blob/master/docs/README.md to make sure.

You don't really have to link a type like Set.
Also we usually do a continuation indent of two spaces.

* the enabled CloudWatch metrics dimensions
* @return Reference to this [[KinesisInputDStream.Builder]]
* @see [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it helps avoid turning off scalastyle, you could just write:

See
[[...]]]

in the main body of the doc above. That seems short enough.

*/
// scalastyle:on line.size.limit
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = {
this.metricsEnabledDimensions = Option(metricsEnabledDimensions)
this
}

/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided
* message handler.
Expand All @@ -287,7 +329,9 @@ object KinesisInputDStream {
ssc.sc.clean(handler),
kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
cloudWatchCredsProvider)
cloudWatchCredsProvider,
metricsLevel.getOrElse(DEFAULT_METRICS_LEVEL),
metricsEnabledDimensions.getOrElse(DEFAULT_METRICS_ENABLED_DIMENSIONS))
}

/**
Expand Down Expand Up @@ -324,4 +368,8 @@ object KinesisInputDStream {
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
private[kinesis] val DEFAULT_METRICS_LEVEL: MetricsLevel =
KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
private[kinesis] val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] =
KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal

import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -92,7 +93,9 @@ private[kinesis] class KinesisReceiver[T](
messageHandler: Record => T,
kinesisCreds: SparkAWSCredentials,
dynamoDBCreds: Option[SparkAWSCredentials],
cloudWatchCreds: Option[SparkAWSCredentials])
cloudWatchCreds: Option[SparkAWSCredentials],
metricsLevel: MetricsLevel,
metricsEnabledDimensions: Set[String])
extends Receiver[T](storageLevel) with Logging { receiver =>

/*
Expand Down Expand Up @@ -162,6 +165,8 @@ private[kinesis] class KinesisReceiver[T](
.withKinesisEndpoint(endpointUrl)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
.withMetricsLevel(metricsLevel)
.withMetricsEnabledDimensions(metricsEnabledDimensions.asJava)

// Update the Kinesis client lib config with timestamp
// if InitialPositionInStream.AT_TIMESTAMP is passed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, DefaultCredentials, None, None)
cleanedHandler, DefaultCredentials, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -132,7 +134,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, kinesisCredsProvider, None, None)
cleanedHandler, kinesisCredsProvider, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -202,7 +206,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, kinesisCredsProvider, None, None)
cleanedHandler, kinesisCredsProvider, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -248,7 +254,9 @@ object KinesisUtils {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None)
KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -299,7 +307,9 @@ object KinesisUtils {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None)
KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis

import java.util.Calendar

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import collection.JavaConverters._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar

Expand Down Expand Up @@ -82,6 +84,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstream.kinesisCreds == DefaultCredentials)
assert(dstream.dynamoDBCreds == None)
assert(dstream.cloudWatchCreds == None)
assert(dstream.metricsLevel == DEFAULT_METRICS_LEVEL)
assert(dstream.metricsEnabledDimensions == DEFAULT_METRICS_ENABLED_DIMENSIONS)
}

test("should propagate custom non-auth values to KinesisInputDStream") {
Expand All @@ -94,6 +98,9 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
val customKinesisCreds = mock[SparkAWSCredentials]
val customDynamoDBCreds = mock[SparkAWSCredentials]
val customCloudWatchCreds = mock[SparkAWSCredentials]
val customMetricsLevel = MetricsLevel.NONE
val customMetricsEnabledDimensions =
KinesisClientLibConfiguration.METRICS_ALWAYS_ENABLED_DIMENSIONS.asScala.toSet

val dstream = builder
.endpointUrl(customEndpointUrl)
Expand All @@ -105,6 +112,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
.metricsLevel(customMetricsLevel)
.metricsEnabledDimensions(customMetricsEnabledDimensions)
.build()
assert(dstream.endpointUrl == customEndpointUrl)
assert(dstream.regionName == customRegion)
Expand All @@ -115,6 +124,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstream.kinesisCreds == customKinesisCreds)
assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
assert(dstream.metricsLevel == customMetricsLevel)
assert(dstream.metricsEnabledDimensions == customMetricsEnabledDimensions)

// Testing with AtTimestamp
val cal = Calendar.getInstance()
Expand All @@ -132,6 +143,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
.metricsLevel(customMetricsLevel)
.metricsEnabledDimensions(customMetricsEnabledDimensions)
.build()
assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
assert(dstreamAtTimestamp.regionName == customRegion)
Expand All @@ -145,6 +158,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds))
assert(dstreamAtTimestamp.metricsLevel == customMetricsLevel)
assert(dstreamAtTimestamp.metricsEnabledDimensions == customMetricsEnabledDimensions)
}

test("old Api should throw UnsupportedOperationExceptionexception with AT_TIMESTAMP") {
Expand Down