-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in next batch. #17216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| case class OffsetSeqMetadata( | ||
| var batchWatermarkMs: Long = 0, | ||
| var batchTimestampMs: Long = 0, | ||
| var numShufflePartitions: Int = 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use conf: Map[String, String] here because we probably will add more confs to this class in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kunalkhamar, in case you would update OffsetSeq's log version number, the work being done in #17070 might be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing Changed to a map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lw-lin Hi Liwei!
Thanks for letting me know, we will not be updating the log version number since backward and forward compatibility is preserved by this patch.
|
Test build #74224 has finished for PR 17216 at commit
|
|
Test build #74226 has finished for PR 17216 at commit
|
| /* | ||
| * For backwards compatibility, if # partitions was not recorded in the offset log, then | ||
| * ensure it is non-zero. The new value is picked up from the conf. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for inline comment with the code, use // and not /* .. */.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
Test build #74290 has finished for PR 17216 at commit
|
|
Test build #74344 has finished for PR 17216 at commit
|
|
Test build #74352 has finished for PR 17216 at commit
|
|
Test build #74353 has finished for PR 17216 at commit
|
| */ | ||
| case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { | ||
| case class OffsetSeqMetadata( | ||
| var batchWatermarkMs: Long = 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why we have this as var? Can they be made into vals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to vals.
| } | ||
|
|
||
| // If the number of partitions is greater, should throw exception. | ||
| withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check whether the returned message is useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems okay to me. Underlying cause is FileNotFoundException. Error message indicates Error reading delta file /path/to/checkpoint/state/[operator]/[partition]/[batch].delta
[info] - SPARK-19873: backward compatibility - recover with wrong num shuffle partitions *** FAILED *** (12 seconds, 98 milliseconds)
[info] org.apache.spark.sql.streaming.StreamingQueryException: Query badQuery [id = dddc5e7f-1e71-454c-8362-de184444fb5a, runId = b2960c74-257a-4eb1-b242-61d13e20655f] terminated with exception: Job aborted due to stage failure: Task 10 in stage 1.0 failed 1 times, most recent failure: Lost task 10.0 in stage 1.0 (TID 11, localhost, executor driver): java.lang.IllegalStateException: Error reading delta file /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta of HDFSStateStoreProvider[id = (op=0, part=10), dir = /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10]: /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta does not exist
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:384)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:336)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:333)
[info] at scala.Option.getOrElse(Option.scala:121)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:333)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
[info] at scala.Option.getOrElse(Option.scala:121)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:332)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:239)
[info] at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:191)
[info] at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
[info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
[info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[info] at org.apache.spark.scheduler.Task.run(Task.scala:108)
[info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
[info] Caused by: java.io.FileNotFoundException: File /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta does not exist
[info] at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
[info] at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
[info] at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
[info] at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
[info] at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:142)
[info] at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
[info] at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:61)
[info] at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
[info] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:381)
[info] ... 24 more
| // Checkpoint data was generated by a query with 10 shuffle partitions. | ||
| // Test if recovery from checkpoint is successful. | ||
| withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { | ||
| query.start().processAllAvailable() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its not clear that this would actually re-execute a batch. unless a batch is executed, this does not test anything. so how about you add more data after processAllAvailable(), to ensure that at least one batch is actually executed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
| inputData.addData(3, 4, 5, 6) | ||
| inputData.addData(5, 6, 7, 8) | ||
|
|
||
| val resourceUri = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment saying that start the query with existing checkpoints generated by 2.1 which do not have shuffle partitions recorded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comments.
| QueryTest.checkAnswer(spark.table("counts").toDF(), | ||
| Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: | ||
| Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you dont seem to stop the query? would be good put a try .. finally within the withSQLConf to stop the query. otherwise can lead to cascaded failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added try .. finally
|
LGTM. a few comments in tests. |
| val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS) | ||
| offsetSeqMetadata = { | ||
| if (nextOffsets.metadata.isEmpty) { | ||
| OffsetSeqMetadata(0, 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you make this call with named params
OffsetSeqMetadata(
batchWatermarkMs = 0,
...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { | ||
| // For backward compatibility, if # partitions was not recorded in the offset log, | ||
| // then ensure it is not missing. The new value is picked up from the conf. | ||
| logDebug("Number of shuffle partitions from previous run not found in checkpoint. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a log warning. So that we can debug. And it should be printed only once, at the time of upgrading for the first time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to log warning.
Rechecked the semantics, it works as expected and warning only printed at time of first upgrade.
Once we restart query from a v2.1 checkpoint and then stop it, any new offsets written out will contain num shuffle partitions. Any future restarts will read these new offsets in StreamExecution.populateStartOffsets->offsetLog.getLatest and pick up the recorded num shuffle partitions.
Useful to note for future reference that we do not change the old offset files to contain num shuffle partitions, the semantics are correct because of call to offsetLog.getLatest.
| } | ||
| } | ||
| } | ||
| offsetSeqMetadata = OffsetSeqMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this offsetSeqMetadata.copy(batchWatermarkMs= batchWatermarkMs, batchTimestampMs = triggerClock.getTimeMillis()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed.
| val checkpointDir = new File(resourceUri) | ||
|
|
||
| // 1 - Test if recovery from the checkpoint is successful. | ||
| init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: init -> prepareMemoryStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { | ||
| var streamingQuery: StreamingQuery = null | ||
| try { | ||
| intercept[StreamingQueryException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| // 2 - Check recovery with wrong num shuffle partitions | ||
| init() | ||
| withTempDir(dir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
withTempDir { dir =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
|
||
| // 1 - Test if recovery from the checkpoint is successful. | ||
| init() | ||
| withTempDir(dir => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: withTempDir { dir =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
LGTM. Just a few nits. |
|
Test build #74561 has finished for PR 17216 at commit
|
|
Test build #74564 has finished for PR 17216 at commit
|
|
Test build #74617 has finished for PR 17216 at commit
|
|
LGTM. Will merge after tests pass. |
|
Does this PR mix in some test files? |
|
Test build #74708 has finished for PR 17216 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. Just some comments.
I notified one issue about SessionState.clone: listenerManager is not cloned. So batches in a streaming query cannot be monitored by the user. Of cause, it's not related to this PR. Could you fix it in a separate PR?
| cd.dataType, cd.timeZoneId) | ||
| } | ||
|
|
||
| // Reset confs to disallow change in number of partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to set the confs for every batch? You can set it after recovering offsetSeqMetadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed.
|
|
||
| // All set | ||
| assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) === | ||
| OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}""")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add a test to verify that unknown fields don't break the serialization? Such as
assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
OffsetSeqMetadata(
s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1"""))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
…onf update occurrence to once at beginning when populating offsets.
|
@uncleGen Not sure what that means, could you please elaborate? |
|
@zsxwing Will change cloning of listener manager in a new PR. |
|
Test build #74754 has finished for PR 17216 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good.
| val sparkSessionToRunBatches = sparkSession.cloneSession() | ||
| // Adaptive execution can change num shuffle partitions, disallow | ||
| sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") | ||
| offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this should be kept. It should use the conf in the cloned session.
|
Test build #74757 has finished for PR 17216 at commit
|
|
Test build #74758 has finished for PR 17216 at commit
|
|
LGTM. Merging it to master. |
|
Test build #74760 has finished for PR 17216 at commit
|
|
Seems like an unrelated failure. Probably a flaky test.
…On Mar 17, 2017 4:23 PM, "UCB AMPLab" ***@***.***> wrote:
Merged build finished. Test FAILed.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#17216 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAoerHj4EZoc91N-MBjkjytKJ7VqxOccks5rmxXqgaJpZM4MXaiH>
.
|
|
|
||
| // Checkpoint data was generated by a query with 10 shuffle partitions. | ||
| // In order to test reading from the checkpoint, the checkpoint must have two or more batches, | ||
| // since the last batch may be rerun. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changes were proposed in this pull request?
If the user changes the shuffle partition number between batches, Streaming aggregation will fail.
Here are some possible cases:
How was this patch tested?
OffsetSeqMetadatajson with Spark v2.1.0