-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19876][SS][WIP] OneTime Trigger Executor #17219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
marmbrus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks for working on it.
| } | ||
| val version = lines.next() | ||
| if (version != OffsetCommitLog.VERSION) { | ||
| throw new IllegalStateException(s"Unknown log version: ${version}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure the error here is consistent with the work being done in #17070
|
|
||
| object OffsetCommitLog { | ||
| private val VERSION = "v1" | ||
| private val SERIALIZED_VOID = "-" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make this an empty json object? {}
| * df.write.trigger(OneTime) | ||
| * }}} | ||
| * | ||
| * Java Example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. this doesnt. please fix them.
|
|
||
| import org.apache.spark.sql.SparkSession | ||
|
|
||
| class OffsetCommitLog(sparkSession: SparkSession, path: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala doc please
| finishTrigger(dataAvailable) | ||
| if (dataAvailable) { | ||
| // We'll increase currentBatchId after we complete processing current batch's data | ||
| commitLog.add(currentBatchId, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should make this async?
| case lastOffsets => | ||
| committedOffsets = lastOffsets.toStreamProgress(sources) | ||
| logDebug(s"Resuming with committed offsets: $committedOffsets") | ||
| currentBatchId = commitLog.getLatest() match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind adding a few more comments here. This logic is getting very dense. I think that its doing something like the following:
- finding the max committed batch
- checking to see if there is a started but uncommitted batch
- otherwise constructing a new batch
|
Test build #74231 has finished for PR 17219 at commit
|
|
Test build #74282 has finished for PR 17219 at commit
|
…rce) after restart
|
Test build #74328 has finished for PR 17219 at commit
|
… an empty call to getBatch on each source using the last committed offset as the end offset
|
Test build #74333 has finished for PR 17219 at commit
|
|
Test build #74347 has finished for PR 17219 at commit
|
|
Test build #74365 has finished for PR 17219 at commit
|
|
Test build #74465 has finished for PR 17219 at commit
|
|
|
||
| /** | ||
| * A log that records the committed batch ids. This is used to check if a batch was committed | ||
| * on restart, instead of (possibly) re-running the previous batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "if a batch was committed on restart" sounds like batches are supposed to get committed only on restart. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, keep the comment generic such that it does mean that not only the previous batch will be re-run. in future we could be rerun multiple batches.
| // called inside a try-finally where the underlying stream is closed in the caller | ||
| val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() | ||
| if (!lines.hasNext) { | ||
| throw new IllegalStateException("Incomplete log file") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you say "incomplete log file in the offset commit log"
| } | ||
|
|
||
| object OffsetCommitLog { | ||
| private val VERSION = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets be consistent with other logs in writing "v1" for version and not "1"
| * The basic structure of this method is as follows: | ||
| * | ||
| * Identify (from the offset log) the offsets used to run the last batch | ||
| * IF a last batch exists THEN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"a last batch" is grammatically weird .. isnt it?
| * | ||
| * Identify (from the offset log) the offsets used to run the last batch | ||
| * IF a last batch exists THEN | ||
| * Set the next batch to that last batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be "set the next batch to be executed as the last recovered batch"
| * Set the next batch to that last batch | ||
| * Check the commit log to see which batch was committed last | ||
| * IF the last batch was committed THEN | ||
| * Call getBatch using the last batch start and end offsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the reason regarding why we do this.
|
|
||
| offsetLog.get(batchId - 1).foreach { | ||
| case lastOffsets => | ||
| if (batchId > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we introducing this condition?
| */ | ||
| private def populateStartOffsets(): Unit = { | ||
| offsetLog.getLatest() match { | ||
| case Some((batchId, nextOffsets)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rename batchId to something more descriptive so that we can semantically differentiate it from the currentBatchId?
| if (batchId < currentBatchId) { | ||
| /* The last batch was successfully committed, so we can safely process a | ||
| * new next batch but first: | ||
| * Make a call a call to getBatch using the offsets from previous batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"a call" is present twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also when referring to getBatch, use source.getBatch to be more clear.
| if batchId == committedBatchId => committedBatchId + 1 | ||
| case _ => batchId | ||
| } | ||
| if (batchId < currentBatchId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above match-case, and this if statement essentially are the same semantic conditions - both will be true or both will be false. So might as well merge these two into a single if condition.
val completedBatchId = completedLog.getLatest()
if (completedBatchId.isDefined && completedBatchId.get == batchId) {
// call source.getBatch
currentBatchId = completedBatchId.get + 1
} else {
// warn if completedBatchId.get < batchId - 1
currentBatchId = batchId
}
| * | ||
| * @since 2.2.0 | ||
| */ | ||
| @Experimental |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need python APIs as well.
| self._jwrite = self._jwrite.queryName(queryName) | ||
| return self | ||
|
|
||
| @keyword_only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed keyword_only, otherwise its weird if we have to write
writeStream.trigger(trigger=OneTime())
python/pyspark/sql/streaming.py
Outdated
| @keyword_only | ||
| @since(2.0) | ||
| def trigger(self, processingTime=None): | ||
| def trigger(self, trigger=None, processingTime=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this before processingTime so that we can use positional param and write trigger(OneTime())
Does not break existing APIs. See doctest examples.
|
Test build #75045 has finished for PR 17219 at commit
|
|
Test build #75074 has finished for PR 17219 at commit
|
|
Test build #75077 has finished for PR 17219 at commit
|
|
Test build #75080 has finished for PR 17219 at commit
|
|
Test build #75086 has started for PR 17219 at commit |
|
Jenkins test this please |
|
Test build #3607 has finished for PR 17219 at commit
|
|
Test build #75091 has finished for PR 17219 at commit
|
|
Test build #75108 has finished for PR 17219 at commit
|
|
Seems like this broke the 2.10 builds: |
## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See #17219 ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes #17921 from srowen/SPARK-19876.2. (cherry picked from commit 25ee816) Signed-off-by: Herman van Hovell <[email protected]>
## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See #17219 ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes #17921 from srowen/SPARK-19876.2.
## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See apache#17219 ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes apache#17921 from srowen/SPARK-19876.2.
## What changes were proposed in this pull request? An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers. In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature. ## How was this patch tested? A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly. In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests: - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop). - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log. - A OneTime trigger execution that results in an exception being thrown. marmbrus tdas zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <[email protected]> Author: Tathagata Das <[email protected]> Closes apache#17219 from tcondie/stream-commit. (cherry picked from commit 746a558)
## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See apache#17219 ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes apache#17921 from srowen/SPARK-19876.2. (cherry picked from commit 25ee816) Signed-off-by: Herman van Hovell <[email protected]> (cherry picked from commit c7bd909)
[SPARK-19876][BUILD] Move Trigger.java to java source hierarchy ## What changes were proposed in this pull request? Simply moves `Trigger.java` to `src/main/java` from `src/main/scala` See apache#17219 ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes apache#17921 from srowen/SPARK-19876.2. (cherry picked from commit 25ee816) Signed-off-by: Herman van Hovell <[email protected]> (cherry picked from commit c7bd909)
What changes were proposed in this pull request?
An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.
In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.
How was this patch tested?
A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.
In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
@marmbrus @tdas @zsxwing
Please review http://spark.apache.org/contributing.html before opening a pull request.