-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run #25987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| }, | ||
| CheckNewAnswer(("c", "-1")), | ||
| assertNumStateRows(total = 0, updated = 0) | ||
| assertNumStateRows(total = 0, updated = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This represents the patch is correct fix, though total = 0, updated = 1 might make someone feeling odd. This is because FlatMapGroupsWithState counts deletion of state as "update", whereas we don't count it as "update" in native streaming aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there other tests where we can check this fix as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the change we encounter the issue is small, as the bug is affecting the metric only when no data batch updates the state row. This is not occurred in non-arbitrary stateful operations, because possible state update is only eviction in this case, and state row eviction is not counted as updates.
This can be occurred in arbitrary stateful operations as timer can be triggered for no data batch and the query can update/delete state row which would trigger update count. This test is verifying one of these cases (delete state), hence I didn't add the new test. If we would like to have another test for state func to update the state row on timeout I can do, but most of things would be redundant.
|
Test build #111642 has finished for PR 25987 at commit
|
|
I've added the query and changed outputs to show which issue this PR fixes. |
|
retest this, please |
|
Test build #113997 has finished for PR 25987 at commit
|
|
Looks like known CI shutdown. |
|
retest this, please |
|
Test build #114002 has finished for PR 25987 at commit
|
9866c9f to
0f59ee2
Compare
|
Test build #114943 has finished for PR 25987 at commit
|
|
retest this, please |
|
Test build #116586 has finished for PR 25987 at commit
|
0f59ee2 to
c6993d0
Compare
|
Test build #118492 has finished for PR 25987 at commit
|
|
Retest this, please |
|
Test build #118496 has finished for PR 25987 at commit
|
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there other spots where we can add tests to @HeartSaVioR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just use runBatch here, or rather the new name hasExecuted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'll resolve conflict and use hasExecuted. Thanks!
| }, | ||
| CheckNewAnswer(("c", "-1")), | ||
| assertNumStateRows(total = 0, updated = 0) | ||
| assertNumStateRows(total = 0, updated = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there other tests where we can check this fix as well?
…to 0 if empty batch is run
|
Thanks for reviewing. Just rebased and reflected review comment. Btw, I've left a comment to explain why I simply modified the existing test instead - #25987 (comment) |
|
Test build #120953 has finished for PR 25987 at commit
|
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Merging to master and 3.0. Thanks @HeartSaVioR |
…tor to 0 if empty batch is run ### What changes were proposed in this pull request? This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.) It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch. ### Why are the changes needed? Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed. Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func): > query ``` case class RunningCount(count: Long) object TestFlatMapGroupsWithState { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession val ss = SparkSession .builder() .appName("TestFlatMapGroupsWithState") .getOrCreate() ss.conf.set("spark.sql.shuffle.partitions", "5") import ss.implicits._ val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.hasTimedOut) { // End users are not restricted to remove the state here - they can update the // state as well. For example, event time session window would have list of // sessions here and it cannot remove entire state. state.update(RunningCount(-1)) Iterator((key, "-1")) } else { val count = state.getOption.map(_.count).getOrElse(0L) + values.size state.update(RunningCount(count)) state.setTimeoutDuration("1 seconds") Iterator((key, count.toString)) } } implicit val sqlContext = ss.sqlContext val inputData = MemoryStream[String] val result = inputData .toDF() .as[String] .groupByKey { v => v } .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(stateFunc) val query = result .writeStream .format("memory") .option("queryName", "test") .outputMode("append") .trigger(Trigger.ProcessingTime("5 second")) .start() Thread.sleep(1000) var chIdx: Long = 0 while (true) { (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) } chIdx += 5 // intentionally sleep much more than trigger to enable "empty" batch Thread.sleep(10 * 1000) } } } ``` > before the patch (batch 3 which was an "empty" batch) ``` { "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e", "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b", "name":"test", "timestamp":"2019-11-18T07:00:25.005Z", "batchId":3, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0, "durationMs":{ "addBatch":1664, "getBatch":0, "latestOffset":0, "queryPlanning":29, "triggerExecution":1789, "walCommit":51 }, "stateOperators":[ { "numRowsTotal":10, "numRowsUpdated":0, "memoryUsedBytes":5130, "customMetrics":{ "loadedMapCacheHitCount":15, "loadedMapCacheMissCount":0, "stateOnCurrentVersionSizeBytes":2722 } } ], "sources":[ { "description":"MemoryStream[value#1]", "startOffset":9, "endOffset":9, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0 } ], "sink":{ "description":"MemorySink", "numOutputRows":5 } } ``` > after the patch (batch 3 which was an "empty" batch) ``` { "id":"7cb41623-6b9a-408e-ae02-6796ec636fa0", "runId":"17847710-ddfe-45f5-a7ab-b160e149382f", "name":"test", "timestamp":"2019-11-18T07:02:25.005Z", "batchId":3, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0, "durationMs":{ "addBatch":1196, "getBatch":0, "latestOffset":0, "queryPlanning":30, "triggerExecution":1333, "walCommit":46 }, "stateOperators":[ { "numRowsTotal":10, "numRowsUpdated":5, "memoryUsedBytes":5130, "customMetrics":{ "loadedMapCacheHitCount":15, "loadedMapCacheMissCount":0, "stateOnCurrentVersionSizeBytes":2722 } } ], "sources":[ { "description":"MemoryStream[value#1]", "startOffset":9, "endOffset":9, "numInputRows":0, "inputRowsPerSecond":0.0, "processedRowsPerSecond":0.0 } ], "sink":{ "description":"MemorySink", "numOutputRows":5 } } ``` "numRowsUpdated" is `0` in "stateOperators" before applying the patch which is "wrong", as we "update" the state when timeout occurs. After applying the patch, it correctly represents the "numRowsUpdated" as `5` in "stateOperators". Closes #25987 from HeartSaVioR/SPARK-29314. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Burak Yavuz <[email protected]> (cherry picked from commit ca2ba4f) Signed-off-by: Burak Yavuz <[email protected]>
|
Thanks for reviewing and merging! |
…tor to 0 if empty batch is run
### What changes were proposed in this pull request?
This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.)
It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch.
### Why are the changes needed?
Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed.
Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func):
> query
```
case class RunningCount(count: Long)
object TestFlatMapGroupsWithState {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val ss = SparkSession
.builder()
.appName("TestFlatMapGroupsWithState")
.getOrCreate()
ss.conf.set("spark.sql.shuffle.partitions", "5")
import ss.implicits._
val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
if (state.hasTimedOut) {
// End users are not restricted to remove the state here - they can update the
// state as well. For example, event time session window would have list of
// sessions here and it cannot remove entire state.
state.update(RunningCount(-1))
Iterator((key, "-1"))
} else {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
state.update(RunningCount(count))
state.setTimeoutDuration("1 seconds")
Iterator((key, count.toString))
}
}
implicit val sqlContext = ss.sqlContext
val inputData = MemoryStream[String]
val result = inputData
.toDF()
.as[String]
.groupByKey { v => v }
.flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(stateFunc)
val query = result
.writeStream
.format("memory")
.option("queryName", "test")
.outputMode("append")
.trigger(Trigger.ProcessingTime("5 second"))
.start()
Thread.sleep(1000)
var chIdx: Long = 0
while (true) {
(chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) }
chIdx += 5
// intentionally sleep much more than trigger to enable "empty" batch
Thread.sleep(10 * 1000)
}
}
}
```
> before the patch (batch 3 which was an "empty" batch)
```
{
"id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e",
"runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b",
"name":"test",
"timestamp":"2019-11-18T07:00:25.005Z",
"batchId":3,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0,
"durationMs":{
"addBatch":1664,
"getBatch":0,
"latestOffset":0,
"queryPlanning":29,
"triggerExecution":1789,
"walCommit":51
},
"stateOperators":[
{
"numRowsTotal":10,
"numRowsUpdated":0,
"memoryUsedBytes":5130,
"customMetrics":{
"loadedMapCacheHitCount":15,
"loadedMapCacheMissCount":0,
"stateOnCurrentVersionSizeBytes":2722
}
}
],
"sources":[
{
"description":"MemoryStream[value#1]",
"startOffset":9,
"endOffset":9,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0
}
],
"sink":{
"description":"MemorySink",
"numOutputRows":5
}
}
```
> after the patch (batch 3 which was an "empty" batch)
```
{
"id":"7cb41623-6b9a-408e-ae02-6796ec636fa0",
"runId":"17847710-ddfe-45f5-a7ab-b160e149382f",
"name":"test",
"timestamp":"2019-11-18T07:02:25.005Z",
"batchId":3,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0,
"durationMs":{
"addBatch":1196,
"getBatch":0,
"latestOffset":0,
"queryPlanning":30,
"triggerExecution":1333,
"walCommit":46
},
"stateOperators":[
{
"numRowsTotal":10,
"numRowsUpdated":5,
"memoryUsedBytes":5130,
"customMetrics":{
"loadedMapCacheHitCount":15,
"loadedMapCacheMissCount":0,
"stateOnCurrentVersionSizeBytes":2722
}
}
],
"sources":[
{
"description":"MemoryStream[value#1]",
"startOffset":9,
"endOffset":9,
"numInputRows":0,
"inputRowsPerSecond":0.0,
"processedRowsPerSecond":0.0
}
],
"sink":{
"description":"MemorySink",
"numOutputRows":5
}
}
```
"numRowsUpdated" is `0` in "stateOperators" before applying the patch which is "wrong", as we "update" the state when timeout occurs. After applying the patch, it correctly represents the "numRowsUpdated" as `5` in "stateOperators".
Closes apache#25987 from HeartSaVioR/SPARK-29314.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Burak Yavuz <[email protected]>
What changes were proposed in this pull request?
This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.)
It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch.
Why are the changes needed?
Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed.
Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func):
"numRowsUpdated" is
0in "stateOperators" before applying the patch which is "wrong", as we "update" the state when timeout occurs. After applying the patch, it correctly represents the "numRowsUpdated" as5in "stateOperators".