-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution. #19926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #84625 has finished for PR 19926 at commit
|
|
Test build #84635 has finished for PR 19926 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need explicitly typing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's easy for the reader to derive that it is a String-typed variable from the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will the noNewData flag still be useful for continuous processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The flag is really just a test harness; it's only used in processAllAvailable, so tests can block until there's a batch (or now epoch) that doesn't contain any data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we make it class name configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not sure what you have in mind here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, how we switch between ContinuousExecution and MicroBatchExecution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My current thinking is to have it be a new trigger type. It can't really be a config, because continuous processing (at least in the initial implementation) won't support all operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may want to add a new line above this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a base class for both microbatch and continuous processing, is it right to put this variable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to tweak the variable name, but continuous processing will still need to know how long it should retain commit and offset log entries. Unfortunately we're stuck with the config name, and I don't think it makes sense to introduce a second parallel one doing the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, tweaking the var names may make it look better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this part of code is removed, offsetLog is still in the base class, and same for batchCommitLog,
offsetLog may be needed as WAL, batchCommitLog should be moved to MicroBatchStreamExecution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The offset log right now has a strict schema that commit information wouldn't fit in. I was planning to keep both logs in the continuous implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, shall we also make them null here and let child classes override them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we could do that.
|
Test build #84787 has finished for PR 19926 at commit
|
|
retest this please |
|
Test build #84810 has finished for PR 19926 at commit
|
|
Test build #84801 has finished for PR 19926 at commit
|
|
Test build #84811 has finished for PR 19926 at commit
|
696ed5f to
baaa933
Compare
|
Test build #84923 has finished for PR 19926 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one minor comment
| * fully processed, and its output was committed to the sink, hence no need to process it again. | ||
| * This is used (for instance) during restart, to help identify which batch to run next. | ||
| */ | ||
| val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep batchCommitLog and offsetLog in the base class since both subclasses need to initialize them. And we can rename batchCommitLog to commitLog to make it more general.
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Really excited to see this move forward
| val triggerLogicalPlan = withNewSources transformAllExpressions { | ||
| case a: Attribute if replacementMap.contains(a) => | ||
| replacementMap(a).withMetadata(a.metadata) | ||
| case ct: CurrentTimestamp => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we thought about how these will work with ContinuousProcessing? Will they be set at each start of the epoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a major candidate solution, but we're planning to just not support CurrentTimestamp for the initial implementation. It would require significant changes, since control flow won't return here between epochs.
|
Test build #84924 has finished for PR 19926 at commit
|
|
Test build #84925 has finished for PR 19926 at commit
|
|
Test build #84926 has finished for PR 19926 at commit
|
|
LGTM. Merging to master! |
What changes were proposed in this pull request?
StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution.
A few fields are also renamed to make them less microbatch-specific.
How was this patch tested?
refactoring only