-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25399][SS] Continuous processing state should not affect microbatch execution jobs #22386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@tdas and @jose-torres for review |
|
Test build #95908 has finished for PR 22386 at commit
|
|
|
||
| test("is_continuous_processing property should be true for continuous processing") { | ||
| val input = ContinuousMemoryStream[Int] | ||
| var x: String = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
Little confuse about this scenario, could you explain more? I mean its only happened in UT or we may meet this on product env? |
jose-torres
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than 1 nit + the comment from @xuanyuanking
| } | ||
|
|
||
| object ContinuousExecution { | ||
| val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this belongs in StreamExecution, since both ContinuousExecution and MicroBatchExecution set it.
| val currentVersion = EpochTracker.getCurrentEpoch match { | ||
| case None => storeVersion | ||
| case Some(value) => value | ||
| val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just simple toBoolean here is OK?Cause you set default value both MicroBatch and Continuous side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd rather keep it as is to be more resilient for the future.
@xuanyuanking It theoretically could have been encountered in production, but continuous processing is considered an experimental feature. The only way to encounter it in production is to run a continuous processing stream and then a microbatch stream in the same spark cluster and have an execution thread get reused. The bug is in StateStoreRDD; EpochTracker sets a ThreadLocal variable called currentEpoch and StateStoreRDD checks for the existence of this variable to decide if the current streaming job is continuous or microbatch. |
| case Some(value) => value | ||
| val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) | ||
| .map(_.toBoolean) | ||
| val currentVersion = if (isContinuous.contains(true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: this looks weird. rather i would do change the previous line val isContinuous = ... .map(_.toBoolean).getOrElse(false)
|
LGTM. Just one super nit. |
|
Test build #95959 has finished for PR 22386 at commit
|
|
Test build #95961 has finished for PR 22386 at commit
|
…batch execution jobs ## What changes were proposed in this pull request? The leftover state from running a continuous processing streaming job should not affect later microbatch execution jobs. If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment, the microbatch job could get wrong answers because it can attempt to load the wrong version of the state. ## How was this patch tested? New and existing unit tests Closes #22386 from mukulmurthy/25399-streamthread. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Tathagata Das <[email protected]> (cherry picked from commit 9f5c5b4) Signed-off-by: Tathagata Das <[email protected]>
|
Great thanks for your comment and fix @mukulmurthy! We'll also port this to our folk. |
…batch execution jobs ## What changes were proposed in this pull request? The leftover state from running a continuous processing streaming job should not affect later microbatch execution jobs. If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment, the microbatch job could get wrong answers because it can attempt to load the wrong version of the state. ## How was this patch tested? New and existing unit tests Closes apache#22386 from mukulmurthy/25399-streamthread. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Tathagata Das <[email protected]>
What changes were proposed in this pull request?
The leftover state from running a continuous processing streaming job should not affect later microbatch execution jobs. If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment, the microbatch job could get wrong answers because it can attempt to load the wrong version of the state.
How was this patch tested?
New and existing unit tests