-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11749][Streaming] Duplicate creating the RDD in file stream when recovering from checkpoint data #9765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I don't get this one. If |
|
@zsxwing |
|
I see. Could you resolve the conflicts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private
|
@zsxwing |
|
Jenkins, test this please |
|
Test build #47660 has started for PR 9765 at commit |
|
retest this please |
|
jenkins test this plese |
|
jenkins test this please |
|
Test build #47672 has finished for PR 9765 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest writing a simpler test that only tests this issue, such as:
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
ssc.checkpoint(checkpointDir)
val inputDStream = new CheckpointInputDStream(ssc)
val checkpointData = inputDStream.checkpointData
val mappedDStream = inputDStream.map(_ + 100)
val outputStream = new TestOutputStreamWithPartitions(mappedDStream)
outputStream.register()
/// do more two times output
mappedDStream.foreachRDD(rdd => rdd.count())
mappedDStream.foreachRDD(rdd => rdd.count())
assert(checkpointData.restoredTimes === 0)
val batchDurationMillis = ssc.progressListener.batchDuration
generateOutput(ssc, Time(batchDurationMillis * 3), checkpointDir, stopSparkContext = false)
assert(checkpointData.restoredTimes === 0)
}
withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
val checkpointData =
ssc.graph.getInputStreams().head.asInstanceOf[CheckpointInputDStream].checkpointData
ssc.start()
ssc.stop()
assert(checkpointData.restoredTimes === 1)
}
|
retest this please |
1 similar comment
|
retest this please |
|
Looks good, merging into master once tests pass. |
|
retest this please |
|
Test build #47752 has finished for PR 9765 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be TestOutputStreamWithPartitions if using generateOutput.
|
@zsxwing |
|
retest this please |
1 similar comment
|
retest this please |
|
Test build #47832 has finished for PR 9765 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this unused line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove outputBuffer and change val outputStream = new TestOutputStreamWithPartitions(mappedDStream, outputBuffer) to val outputStream = new TestOutputStreamWithPartitions(mappedDStream) since the output buffer is not necessary now.
|
Just two nits. Otherwise LGTM |
|
@zsxwing |
|
retest this please |
|
Test build #47936 has finished for PR 9765 at commit
|
|
retest this please? |
|
Test build #47942 has finished for PR 9765 at commit
|
|
OOM happens in the PySpark, Env issue? |
|
retest this please |
|
Test build #47966 has finished for PR 9765 at commit
|
|
@jhu-chang thanks, merging to master and 1.6 |
…en recovering from checkpoint data Add a transient flag `DStream.restoredFromCheckpointData` to control the restore processing in DStream to avoid duplicate works: check this flag first in `DStream.restoreCheckpointData`, only when `false`, the restore process will be executed. Author: jhu-chang <[email protected]> Closes #9765 from jhu-chang/SPARK-11749. (cherry picked from commit f4346f6) Signed-off-by: Shixiong Zhu <[email protected]>
Add a transient flag
DStream.restoredFromCheckpointDatato control the restore processing in DStream to avoid duplicate works: check this flag first inDStream.restoreCheckpointData, only whenfalse, the restore process will be executed.