Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m

</div>
<div data-lang="java" markdown="1">
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
Expand Down Expand Up @@ -98,14 +98,21 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
</div>
</div>

You may also provide a "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. This is currently only supported in Scala and Java.
You may also provide the following settings. These are currently only supported in Scala and Java.

- A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key.

- CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details.

<div class="codetabs">
<div data-lang="scala" markdown="1">
import collection.JavaConverters._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
Expand All @@ -116,17 +123,22 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
.buildWithMessageHandler([message handler])

</div>
<div data-lang="java" markdown="1">
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import scala.collection.JavaConverters;

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
Expand All @@ -135,6 +147,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.metricsLevel(MetricsLevel.DETAILED)
.metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
.buildWithMessageHandler([message handler]);

</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis

import scala.reflect.ClassTag

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import collection.JavaConverters._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.rdd.RDD
Expand All @@ -43,7 +45,9 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val messageHandler: Record => T,
val kinesisCreds: SparkAWSCredentials,
val dynamoDBCreds: Option[SparkAWSCredentials],
val cloudWatchCreds: Option[SparkAWSCredentials]
val cloudWatchCreds: Option[SparkAWSCredentials],
val metricsLevel: MetricsLevel,
val metricsEnabledDimensions: Set[String]
) extends ReceiverInputDStream[T](_ssc) {

import KinesisReadConfigurations._
Expand Down Expand Up @@ -79,7 +83,8 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
kinesisCreds, dynamoDBCreds, cloudWatchCreds)
kinesisCreds, dynamoDBCreds, cloudWatchCreds,
metricsLevel, metricsEnabledDimensions)
}
}

Expand All @@ -104,6 +109,8 @@ object KinesisInputDStream {
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
private var dynamoDBCredsProvider: Option[SparkAWSCredentials] = None
private var cloudWatchCredsProvider: Option[SparkAWSCredentials] = None
private var metricsLevel: Option[MetricsLevel] = None
private var metricsEnabledDimensions: Option[Set[String]] = None

/**
* Sets the StreamingContext that will be used to construct the Kinesis DStream. This is a
Expand Down Expand Up @@ -237,6 +244,7 @@ object KinesisInputDStream {
* endpoint. Defaults to [[DefaultCredentialsProvider]] if no custom value is specified.
*
* @param credentials [[SparkAWSCredentials]] to use for Kinesis authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def kinesisCredentials(credentials: SparkAWSCredentials): Builder = {
kinesisCredsProvider = Option(credentials)
Expand All @@ -248,6 +256,7 @@ object KinesisInputDStream {
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for DynamoDB authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder = {
dynamoDBCredsProvider = Option(credentials)
Expand All @@ -259,12 +268,43 @@ object KinesisInputDStream {
* endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
*
* @param credentials [[SparkAWSCredentials]] to use for CloudWatch authentication
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder = {
cloudWatchCredsProvider = Option(credentials)
this
}

/**
* Sets the CloudWatch metrics level. Defaults to
* [[KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL]] if no custom value is specified.
*
* @param metricsLevel [[MetricsLevel]] to specify the CloudWatch metrics level
* @return Reference to this [[KinesisInputDStream.Builder]]
* @see
* [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
*/
def metricsLevel(metricsLevel: MetricsLevel): Builder = {
this.metricsLevel = Option(metricsLevel)
this
}

/**
* Sets the enabled CloudWatch metrics dimensions. Defaults to
* [[KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS]]
* if no custom value is specified.
*
* @param metricsEnabledDimensions Set[String] to specify which CloudWatch metrics dimensions
* should be enabled
* @return Reference to this [[KinesisInputDStream.Builder]]
* @see
* [[https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels]]
*/
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = {
this.metricsEnabledDimensions = Option(metricsEnabledDimensions)
this
}

/**
* Create a new instance of [[KinesisInputDStream]] with configured parameters and the provided
* message handler.
Expand All @@ -287,7 +327,9 @@ object KinesisInputDStream {
ssc.sc.clean(handler),
kinesisCredsProvider.getOrElse(DefaultCredentials),
dynamoDBCredsProvider,
cloudWatchCredsProvider)
cloudWatchCredsProvider,
metricsLevel.getOrElse(DEFAULT_METRICS_LEVEL),
metricsEnabledDimensions.getOrElse(DEFAULT_METRICS_ENABLED_DIMENSIONS))
}

/**
Expand Down Expand Up @@ -324,4 +366,8 @@ object KinesisInputDStream {
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
private[kinesis] val DEFAULT_METRICS_LEVEL: MetricsLevel =
KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
private[kinesis] val DEFAULT_METRICS_ENABLED_DIMENSIONS: Set[String] =
KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal

import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -92,7 +93,9 @@ private[kinesis] class KinesisReceiver[T](
messageHandler: Record => T,
kinesisCreds: SparkAWSCredentials,
dynamoDBCreds: Option[SparkAWSCredentials],
cloudWatchCreds: Option[SparkAWSCredentials])
cloudWatchCreds: Option[SparkAWSCredentials],
metricsLevel: MetricsLevel,
metricsEnabledDimensions: Set[String])
extends Receiver[T](storageLevel) with Logging { receiver =>

/*
Expand Down Expand Up @@ -162,6 +165,8 @@ private[kinesis] class KinesisReceiver[T](
.withKinesisEndpoint(endpointUrl)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
.withMetricsLevel(metricsLevel)
.withMetricsEnabledDimensions(metricsEnabledDimensions.asJava)

// Update the Kinesis client lib config with timestamp
// if InitialPositionInStream.AT_TIMESTAMP is passed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, DefaultCredentials, None, None)
cleanedHandler, DefaultCredentials, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -132,7 +134,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, kinesisCredsProvider, None, None)
cleanedHandler, kinesisCredsProvider, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -202,7 +206,9 @@ object KinesisUtils {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, kinesisCredsProvider, None, None)
cleanedHandler, kinesisCredsProvider, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -248,7 +254,9 @@ object KinesisUtils {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None)
KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down Expand Up @@ -299,7 +307,9 @@ object KinesisUtils {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None)
KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None,
KinesisInputDStream.DEFAULT_METRICS_LEVEL,
KinesisInputDStream.DEFAULT_METRICS_ENABLED_DIMENSIONS)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.streaming.kinesis

import java.util.Calendar

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import collection.JavaConverters._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar

Expand Down Expand Up @@ -82,6 +84,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstream.kinesisCreds == DefaultCredentials)
assert(dstream.dynamoDBCreds == None)
assert(dstream.cloudWatchCreds == None)
assert(dstream.metricsLevel == DEFAULT_METRICS_LEVEL)
assert(dstream.metricsEnabledDimensions == DEFAULT_METRICS_ENABLED_DIMENSIONS)
}

test("should propagate custom non-auth values to KinesisInputDStream") {
Expand All @@ -94,6 +98,9 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
val customKinesisCreds = mock[SparkAWSCredentials]
val customDynamoDBCreds = mock[SparkAWSCredentials]
val customCloudWatchCreds = mock[SparkAWSCredentials]
val customMetricsLevel = MetricsLevel.NONE
val customMetricsEnabledDimensions =
KinesisClientLibConfiguration.METRICS_ALWAYS_ENABLED_DIMENSIONS.asScala.toSet

val dstream = builder
.endpointUrl(customEndpointUrl)
Expand All @@ -105,6 +112,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
.metricsLevel(customMetricsLevel)
.metricsEnabledDimensions(customMetricsEnabledDimensions)
.build()
assert(dstream.endpointUrl == customEndpointUrl)
assert(dstream.regionName == customRegion)
Expand All @@ -115,6 +124,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstream.kinesisCreds == customKinesisCreds)
assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
assert(dstream.metricsLevel == customMetricsLevel)
assert(dstream.metricsEnabledDimensions == customMetricsEnabledDimensions)

// Testing with AtTimestamp
val cal = Calendar.getInstance()
Expand All @@ -132,6 +143,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
.metricsLevel(customMetricsLevel)
.metricsEnabledDimensions(customMetricsEnabledDimensions)
.build()
assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
assert(dstreamAtTimestamp.regionName == customRegion)
Expand All @@ -145,6 +158,8 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds))
assert(dstreamAtTimestamp.metricsLevel == customMetricsLevel)
assert(dstreamAtTimestamp.metricsEnabledDimensions == customMetricsEnabledDimensions)
}

test("old Api should throw UnsupportedOperationExceptionexception with AT_TIMESTAMP") {
Expand Down