Skip to content

Commit 036d219

Browse files
cfreglytdas
authored andcommitted
updated kinesis docs and added an arch diagram
1 parent 24f622a commit 036d219

File tree

2 files changed

+50
-28
lines changed

2 files changed

+50
-28
lines changed
113 KB
Loading

docs/streaming-kinesis-integration.md

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ layout: global
33
title: Spark Streaming + Kinesis Integration
44
---
55
[Amazon Kinesis](http://aws.amazon.com/kinesis/) is a fully managed service for real-time processing of streaming data at massive scale.
6-
The Kinesis input DStream and receiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
7-
The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases.
6+
The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
7+
The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.
88
Here we explain how to configure Spark Streaming to receive data from Kinesis.
99

1010
#### Configuring Kinesis
@@ -15,30 +15,33 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
1515

1616
#### Configuring Spark Streaming Application
1717

18-
1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
18+
1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
1919

2020
groupId = org.apache.spark
2121
artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}
2222
version = {{site.SPARK_VERSION_SHORT}}
2323

2424
**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
2525

26-
2. **Programming:** In the streaming application code, import `KinesisUtils` and create input DStream as follows.
26+
2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream as follows:
2727

2828
<div class="codetabs">
2929
<div data-lang="scala" markdown="1">
30+
import org.apache.spark.streaming.Duration
3031
import org.apache.spark.streaming.kinesis._
3132
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
3233

3334
val kinesisStream = KinesisUtils.createStream(
3435
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
3536

3637
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
37-
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the next subsection for instructions to run the example.
38+
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
3839

3940
</div>
4041
<div data-lang="java" markdown="1">
41-
import org.apache.spark.streaming.flume.*;
42+
import org.apache.spark.streaming.Duration;
43+
import org.apache.spark.streaming.kinesis.*;
44+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
4245

4346
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
4447
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
@@ -48,33 +51,54 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
4851

4952
</div>
5053
</div>
54+
`streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
5155

52-
`[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
56+
`[Kinesis stream name]`: The Kinesis stream that this streaming application receives from
5357

54-
`[checkpoint interval]`: The interval at which the Kinesis client library is going to save its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
58+
*Points to remember:*
5559

56-
`[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see later section and Amazon Kinesis API documentation for more details).
60+
- The application name used in the streaming context becomes the Kinesis application name
61+
- The application name must be unique for a given account and region.
62+
- The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization.
63+
- Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.
5764

58-
*Points to remember:*
65+
`[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
5966

60-
- The name used in the context of the streaming application must be unique for a given account and region. Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream.
61-
- A single Kinesis input DStream can receive many Kinesis shards by spinning up multiple KinesisRecordProcessor threads. Note that there is no correlation between number of shards in Kinesis and the number of partitions in the generated RDDs that is used for processing the data.
62-
- You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread.
63-
- Horizontal scaling is achieved by autoscaling additional Kinesis input DStreams (separate processes) up to the number of current shards for a given stream, of course.
67+
`[checkpoint interval]`: The interval (ie. Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
6468

65-
3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
69+
`[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).
6670

67-
- A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL. It is used to store KCL's checkpoint information.
71+
3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
6872

69-
- If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch.
73+
*Points to remember at runtime:*
74+
- Kinesis data processing is ordered per partition and occurs at-least once per message.
75+
- Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamodDB.
76+
- A single Kinesis stream shard is processed by one input DStream at a time.
77+
<p style="text-align: center;">
78+
<img src="img/streaming-kinesis-arch.png"
79+
title="Spark Streaming Kinesis Architecture"
80+
alt="Spark Streaming Kinesis Architecture"
81+
width="60%" />
82+
<!-- Images are downsized intentionally to improve quality on retina displays -->
83+
</p>
84+
- A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.
85+
- Multiple input DStreams running in separate processes/instances can read from a Kinesis stream.
86+
- You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard.
87+
- Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point.
88+
- The Kinesis input DStream will balance the load between all DStreams - even across processes/instances.
89+
- The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.
90+
- As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible.
91+
- Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details.
92+
- There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes.
7093

7194
#### Running the Example
7295
To run the example,
96+
7397
- Download Spark source and follow the [instructions](building-with-maven.html) to build Spark with profile *-Pkinesis-asl*.
7498

7599
mvn -Pkinesis-asl -DskipTests clean package
76100

77-
- Set up Kinesis stream (see earlier section). Note the name of the Kinesis stream, and the endpoint URL corresponding to the region the stream is based on.
101+
- Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created.
78102

79103
- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials.
80104

@@ -93,19 +117,17 @@ To run the example,
93117
</div>
94118
</div>
95119

96-
This will wait for data to be received from Kinesis.
120+
This will wait for data to be received from the Kinesis stream.
97121

98-
- To generate random string data, in another terminal, run the associated Kinesis data producer.
122+
- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.
99123

100124
bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
101125

102-
This will push random words to the Kinesis stream, which should then be received and processed by the running example.
126+
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.
103127

104128
#### Kinesis Checkpointing
105-
The Kinesis receiver checkpoints the position of the stream that has been read periodically, so that the system can recover from failures and continue processing where it had left off. Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.
106-
107-
- If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.
108-
109-
- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored). In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data.
110-
111-
- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency.
129+
- Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the DStream left off.
130+
- Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.
131+
- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.
132+
- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
133+
- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.

0 commit comments

Comments
 (0)