-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs #9970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m | |
|
|
||
| **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** | ||
|
|
||
| 2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream as follows: | ||
| 2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows: | ||
|
|
||
| <div class="codetabs"> | ||
| <div data-lang="scala" markdown="1"> | ||
|
|
@@ -49,7 +49,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m | |
| [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2); | ||
|
|
||
| See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) | ||
| and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example. | ||
| and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the Running the Example subsection for instructions to run the example. | ||
|
|
||
| </div> | ||
| <div data-lang="python" markdown="1"> | ||
|
|
@@ -60,18 +60,47 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m | |
| [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) | ||
|
|
||
| See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils) | ||
| and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the next subsection for instructions to run the example. | ||
| and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the Running the Example subsection for instructions to run the example. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as the above comment. |
||
|
|
||
| </div> | ||
| </div> | ||
|
|
||
| - `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream | ||
| You may also provide a "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. This is currently only supported in Java and Python. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only in Java and Python? It should be |
||
|
|
||
| - `[Kineiss app name]`: The application name that will be used to checkpoint the Kinesis | ||
| <div class="codetabs"> | ||
| <div data-lang="scala" markdown="1"> | ||
|
|
||
| import org.apache.spark.streaming.Duration | ||
| import org.apache.spark.streaming.kinesis._ | ||
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream | ||
|
|
||
| val kinesisStream = KinesisUtils.createStream[T]( | ||
| streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], | ||
| [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, | ||
| [message handler]) | ||
|
|
||
| </div> | ||
| <div data-lang="java" markdown="1"> | ||
|
|
||
| import org.apache.spark.streaming.Duration; | ||
| import org.apache.spark.streaming.kinesis.*; | ||
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; | ||
|
|
||
| JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream( | ||
| streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], | ||
| [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, | ||
| [message handler], [class T]); | ||
|
|
||
| </div> | ||
| </div> | ||
|
|
||
| - `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream | ||
|
|
||
| - `[Kinesis app name]`: The application name that will be used to checkpoint the Kinesis | ||
| sequence numbers in DynamoDB table. | ||
| - The application name must be unique for a given account and region. | ||
| - If the table exists but has incorrect checkpoint information (for a different stream, or | ||
| old expired sequenced numbers), then there may be temporary errors. | ||
| old expired sequenced numbers), then there may be temporary errors. | ||
|
|
||
| - `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from. | ||
|
|
||
|
|
@@ -83,6 +112,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m | |
|
|
||
| - `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details). | ||
|
|
||
| - `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`. | ||
|
|
||
| In other versions of the API, you can also specify the AWS access key and secret key directly. | ||
|
|
||
| 3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). | ||
|
|
@@ -99,7 +130,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m | |
| <img src="img/streaming-kinesis-arch.png" | ||
| title="Spark Streaming Kinesis Architecture" | ||
| alt="Spark Streaming Kinesis Architecture" | ||
| width="60%" | ||
| width="60%" | ||
| /> | ||
| <!-- Images are downsized intentionally to improve quality on retina displays --> | ||
| </p> | ||
|
|
@@ -165,11 +196,16 @@ To run the example, | |
|
|
||
| This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example. | ||
|
|
||
| #### Record De-aggregation | ||
|
|
||
| When data is generated using the [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html), messages may be aggregated for cost savings. Spark Streaming will automatically | ||
| de-aggregate records during consumption. | ||
|
|
||
| #### Kinesis Checkpointing | ||
| - Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the DStream left off. | ||
|
|
||
| - Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy. | ||
|
|
||
| - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable. | ||
| - InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored). | ||
| - InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored). | ||
| - InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a link for
Running the Example, such as[Running the Example ](#Running the Example)?