[SPARK-20140][DStream] Remove hardcoded kinesis retry wait and max retries#17467
[SPARK-20140][DStream] Remove hardcoded kinesis retry wait and max retries#17467yssharma wants to merge 1 commit intoapache:masterfrom
Conversation
|
Hi @yssharma, this PR looks a non-trivial change that needs a JIRA. Please refer http://spark.apache.org/contributing.html. |
|
Have got this JIRA ticket for the patch - https://issues.apache.org/jira/browse/SPARK-20140 |
|
Oh right you meant the PR title format. I will reject this pr and post a new one. Thanks @HyukjinKwon |
|
You could just edit the title. I think closing this and opening new one is also fine and an option. |
|
@HyukjinKwon What should be the next steps for this PR. Are there any Spark-Kinesis experts who can review the patch ? |
|
I am not used to Kinesis. I usually click blame button and check both the recent code modifier and committer, e.g., https://github.com/yssharma/spark/blame/4b589adeaef540f6227266ecc628ad41ef0733c3/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala I think I assume @tdas and @brkyvz are experts in this area. |
|
You're a gem @HyukjinKwon 💯 . I will wait for Tathagata and Burak's inputs then :) |
…sient annotations The Scala 2.11 SBT build currently fails for Spark 1.6.0 and master due to warnings about the `transient` annotation: ``` [error] [warn] /Users/joshrosen/Documents/spark/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala:73: no valid targets for annotation on value sc - it is discarded unused. You may specify targets with meta-annotations, e.g. (transient param) [error] [warn] transient sc: SparkContext, ``` This fix implemented here is the same as what we did in #8433: remove the `transient` annotations when they are not necessary and replace use `transient private val` in the remaining cases. Author: Josh Rosen <joshrosen@databricks.com> Closes #10479 from JoshRosen/fix-sbt-2.11.
4b589ad to
67bcf26
Compare
89e27fd to
ccb6c19
Compare
|
Test build #3634 has finished for PR 17467 at commit
|
|
@srowen - Could I get some love here as well. Thanks |
srowen
left a comment
There was a problem hiding this comment.
It seems reasonable but I don't know enough to review Kinesis changes, really
There was a problem hiding this comment.
define this once during class initialization
brkyvz
left a comment
There was a problem hiding this comment.
Thanks for working on this PR! Sorry for not being able to take a look earlier. Some feedback:
- I thought the
kinesisConfigsmap was there to make testing easier, but it didn't since you set the SparkConfs as well as the map, so I suggest we remove it - Let's make
spark.streaming.kinesis.retry.wait.timea duration string, allow it to take100msinstead of have it be in millis. You can use https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java#L244 - May be a tad bit shorter to name as:
spark.streaming.kinesis.retry.waitTime
spark.streaming.kinesis.retry.maxAttempts
There was a problem hiding this comment.
define this once during class initialization as well
There was a problem hiding this comment.
We can take duration strings, such as 100ms or 1s, I would prefer we use that.
There was a problem hiding this comment.
do you need this to be provided as a Map? You already have sc being passed in with all the configurations
There was a problem hiding this comment.
+1. I think reading the config values from sc will be a much cleaner approach.
ccb6c19 to
f51367f
Compare
|
@brkyvz - thanks for taking time to review the patch. appreciate it. As for the spark context, I wanted to use the sparkcontext available in The patch now doesnot use the Let me know your thoughts. Thanks again for the review comments. |
|
Just for info, while trying to use the
|
|
Waiting for review @brkyvz . Thanks. |
There was a problem hiding this comment.
No, this value is modified after waits -
kinesisWaitTimeMs *= 2 // if you have waited, then double wait time for next round
There was a problem hiding this comment.
I don't think you want to do this-- kinesisWaitTimeMs is never reset to the default value after the retry loop exists. I think you should make this a val and introduce a var initialized to its value within retryOrTimeout() to store the wait time for each retry iteration.
|
Not a Spark committer, but I've contributed to this component in the past. I would strongly prefer an approach that avoids adding an additional parameter to all of the Kinesis classes if the |
There was a problem hiding this comment.
If using SparkConf to store these custom config values doesn't end up being feasible then I'd strongly prefer that we follow the existing approach and have separate builder methods for setting the retry wait time and max attempts.
There was a problem hiding this comment.
Do you think it would be better to pass values to builder rather than a map of configs. I thought map of configs can be easily extended when we need to support new configurations without code changes?
What is your thought on values+builder per config vs one map for all configs ?
There was a problem hiding this comment.
If you want the extensibility of a key/value map for configs then I would go the route of getting a solution that uses SparkConf to do that in order to use the existing facilities provided by Spark. It doesn't make sense to me to introduce a key/value map just for Kinesis, especially since the naming of your keys (e.g. spark.streaming.kinesis.retry.waitTime) would indicate to most users that these are SparkConf params, not a Kinesis-specific mapping that must be manually set up and passed to the Kinesis stream builder.
There was a problem hiding this comment.
also please move + to line above
There was a problem hiding this comment.
How about something like this?
private[kinesis] case class KinesisReadConfigurations(
maxRetries: Int,
retryWaitTimeMs: Long,
retryTimeoutMs: Long)
object KinesisReadConfigurations {
def apply(): KinesisReadConfigurations = {
KinesisReadConfigurations(3, 100, 10000)
}
def apply(ssc: StreamingContext): KinesisReadConfigurations = {
KinesisReadConfigurations(
maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES),
retryWaitTimeMs = JavaUtils.timeStringAsMs(
ssc.sc.getConf.getString(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)),
retryTimeoutMs = ssc.graph.batchDuration.milliseconds)
}
val DEFAULT_MAX_RETRIES = 3
val DEFAULT_RETRY_WAIT_TIME = "100ms"
}There was a problem hiding this comment.
mind putting these in a try - finally block
|
@yssharma left some comments |
6a61218 to
6a2950f
Compare
|
@brkyvz - Thanks for the review comments. Updated the patch, please review. |
budde
left a comment
There was a problem hiding this comment.
Took a look and left a few comments, mostly around some style and formatting issues. On the whole, I'm not sure how I feel about introducing the KinesisReadConfigurations case class if it's just being used to pull a couple config values out from SparkConf and pass them to KinesisBackedBlockRDD which itself is already taking a SparkContext as an argument. Reading the config values in the constructor and storing them as val fields should resolve the serialization issues you were seeing while avoiding having to serialize the entire SparkConf. I'll defer to @brkyvz on this issue though.
There was a problem hiding this comment.
nit: You're missing a space here and on the following line
There was a problem hiding this comment.
Incorrect indentation here-- should be 2 softabs/4 spaces
There was a problem hiding this comment.
I would use constants and named parameters here too, e.g.
def apply(): KinesisReadConfigurations = KinesisReadConfigurations(
maxRetries = DEFAULT_MAX_RETRIES,
...
brkyvz
left a comment
There was a problem hiding this comment.
Looking much better now. Left a couple more comments
There was a problem hiding this comment.
actually do we even need this?
There was a problem hiding this comment.
It can be used in places where we don't have the spark conf. I am using this in KinesisBackedBlockRDD's constructor.
There was a problem hiding this comment.
these also need to be in the finally
|
Thanks for the comments @budde @brkyvz . Would be adding the changes soon. |
|
I really prefer the case class. It's used in places where we don't pass in |
|
Fair enough. I took another look and I think I may have been thinking of the way things worked in an earlier revision of this code. I think the case class is reasonable. |
There was a problem hiding this comment.
I think it would be sufficient to change this to
kinesisReadConfigs = KinesisReadConfigurations(ssc))and omit lines 65-70. I don't think a comment is necessary here, the code is pretty straightforward.
6a2950f to
91a777c
Compare
|
LGTM! Merging to master/branch-2.2 |
…tries ## What changes were proposed in this pull request? The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES. This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge. Following happens in a typical kinesis recovery : - kinesis throttles large number of requests while recovering - retries in case of throttling are not able to recover due to the small wait period - kinesis throttles per second, the wait period should be configurable for recovery The patch picks the spark kinesis configs from: - spark.streaming.kinesis.retry.wait.time - spark.streaming.kinesis.retry.max.attempts Jira : https://issues.apache.org/jira/browse/SPARK-20140 ## How was this patch tested? Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling. Author: Yash Sharma <ysharma@atlassian.com> Closes #17467 from yssharma/ysharma/spark-kinesis-retries. (cherry picked from commit 38f4e86) Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
## What changes were proposed in this pull request? The changes were merged as part of - apache#17467. The documentation was missed somewhere in the review iterations. Adding the documentation where it belongs. ## How was this patch tested? Docs. Not tested. cc budde , brkyvz Author: Yash Sharma <ysharma@atlassian.com> Closes apache#18028 from yssharma/ysharma/kinesis_retry_docs.
## What changes were proposed in this pull request? The changes were merged as part of - #17467. The documentation was missed somewhere in the review iterations. Adding the documentation where it belongs. ## How was this patch tested? Docs. Not tested. cc budde , brkyvz Author: Yash Sharma <ysharma@atlassian.com> Closes #18028 from yssharma/ysharma/kinesis_retry_docs. (cherry picked from commit 92580bd) Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
…tries ## What changes were proposed in this pull request? The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES. This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge. Following happens in a typical kinesis recovery : - kinesis throttles large number of requests while recovering - retries in case of throttling are not able to recover due to the small wait period - kinesis throttles per second, the wait period should be configurable for recovery The patch picks the spark kinesis configs from: - spark.streaming.kinesis.retry.wait.time - spark.streaming.kinesis.retry.max.attempts Jira : https://issues.apache.org/jira/browse/SPARK-20140 ## How was this patch tested? Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling. Author: Yash Sharma <ysharma@atlassian.com> Closes apache#17467 from yssharma/ysharma/spark-kinesis-retries.
…tries ## What changes were proposed in this pull request? The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES. This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge. Following happens in a typical kinesis recovery : - kinesis throttles large number of requests while recovering - retries in case of throttling are not able to recover due to the small wait period - kinesis throttles per second, the wait period should be configurable for recovery The patch picks the spark kinesis configs from: - spark.streaming.kinesis.retry.wait.time - spark.streaming.kinesis.retry.max.attempts Jira : https://issues.apache.org/jira/browse/SPARK-20140 ## How was this patch tested? Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling. Author: Yash Sharma <ysharma@atlassian.com> Closes apache#17467 from yssharma/ysharma/spark-kinesis-retries.
## What changes were proposed in this pull request? The changes were merged as part of - apache#17467. The documentation was missed somewhere in the review iterations. Adding the documentation where it belongs. ## How was this patch tested? Docs. Not tested. cc budde , brkyvz Author: Yash Sharma <ysharma@atlassian.com> Closes apache#18028 from yssharma/ysharma/kinesis_retry_docs.
What changes were proposed in this pull request?
The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.
This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge.
Following happens in a typical kinesis recovery :
The patch picks the spark kinesis configs from:
Jira : https://issues.apache.org/jira/browse/SPARK-20140
How was this patch tested?
Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling.