From fff18ff9bff8ccd706b42a2c61f85c287118b57c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 27 Sep 2019 05:05:36 +0900 Subject: [PATCH 1/8] [SPARK-28074][DOC][SS] Document caveats on using multiple stateful operations in single query --- .../structured-streaming-programming-guide.md | 25 +++++++++- .../UnsupportedOperationChecker.scala | 47 ++++++++++++++++++- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index deaf262c5f57..539adc4d1f67 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1503,8 +1503,6 @@ Additional details on supported joins: - Cannot use streaming aggregations before joins. - - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins. - ### Streaming Deduplication You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. @@ -1616,6 +1614,8 @@ this configuration judiciously. ### Arbitrary Stateful Operations Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). +Though Spark cannot check and force it, state function should be implemented with respect of semantic of output mode. e.g. In update mode Spark doesn't expect state function will emit rows which are older than current watermark, whereas in Append mode state function can emit these rows. + ### Unsupported Operations There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows. @@ -1647,6 +1647,27 @@ For example, sorting on the input stream is not supported, as it requires keepin track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. +### Limitation of global watermark + +In some circumstance, some stateful operations could emit rows older than current watermark (with allowed delay), +which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. +This could bring correctness issue. + +This is a limitation of global watermark and operator-wise watermark is not yet supported. Before Spark will support +operator-wise watermark, Spark will check the logical plan of query and log warning when Spark detects such pattern. + +Any stateful operation(s) after any of below stateful operations are possibly having issue: + +* streaming aggregation in Append mode +* stream-stream outer join +* mapGroupsWithState and flatMapGroupsWithState in Append mode (depending on the implementation of state function) + +As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that state function +could emit late rows if the operator is append mode. + +There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure +end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. + ## Starting Streaming Queries Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter` ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 288ff1a04737..a304787684d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, CurrentDate, CurrentTimestamp, MonotonicallyIncreasingID} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression @@ -30,7 +31,7 @@ import org.apache.spark.sql.streaming.OutputMode /** * Analyzes the presence of unsupported operations in a logical plan. */ -object UnsupportedOperationChecker { +object UnsupportedOperationChecker extends Logging { def checkForBatch(plan: LogicalPlan): Unit = { plan.foreachUp { @@ -41,8 +42,48 @@ object UnsupportedOperationChecker { } } - def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = { + def warnStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = { + def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { + case s: Aggregate + if s.isStreaming && outputMode == InternalOutputModes.Append => true + case ExtractEquiJoinKeys(joinType, _, _, _, left, right, _) + if left.isStreaming && right.isStreaming && joinType != Inner => true + case f: FlatMapGroupsWithState + if f.isStreaming && f.outputMode == OutputMode.Append() => true + case _ => false + } + + def isStatefulOperation(p: LogicalPlan): Boolean = p match { + case s: Aggregate if s.isStreaming => true + case _ @ ExtractEquiJoinKeys(_, _, _, _, left, right, _) + if left.isStreaming && right.isStreaming => true + case f: FlatMapGroupsWithState if f.isStreaming => true + case d: Deduplicate if d.isStreaming => true + case _ => false + } + var loggedWarnMessage = false + plan.foreach { subPlan => + if (isStatefulOperation(subPlan)) { + subPlan.find { p => + (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p) + } match { + case Some(_) if !loggedWarnMessage => + logWarning("Detected pattern of possible 'correctness' issue " + + "due to global watermark. " + + "The query contains stateful operation which can possibly emit late rows, and " + + "downstream stateful operation which drop emitted late rows. " + + "Please refer the programming guide doc for more details." + + s";\n$plan") + loggedWarnMessage = true + + case _ => + } + } + } + } + + def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = { if (!plan.isStreaming) { throwError( "Queries without streaming sources cannot be executed with writeStream.start()")(plan) @@ -339,6 +380,8 @@ object UnsupportedOperationChecker { // Check if there are unsupported expressions in streaming query plan. checkUnsupportedExpressions(subPlan) } + + warnStreamingQueryGlobalWatermarkLimit(plan, outputMode) } def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = { From 6f2286459e2d3701fc01a3ad9294395220733a2c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 27 Sep 2019 18:20:20 +0900 Subject: [PATCH 2/8] Add various UTs for testing checkStreamingQueryGlobalWatermarkLimit --- .../UnsupportedOperationChecker.scala | 29 ++- .../analysis/UnsupportedOperationsSuite.scala | 181 ++++++++++++++++++ 2 files changed, 200 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index a304787684d9..07bbbb42be8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -42,11 +42,14 @@ object UnsupportedOperationChecker extends Logging { } } - def warnStreamingQueryGlobalWatermarkLimit(plan: LogicalPlan, outputMode: OutputMode): Unit = { + def checkStreamingQueryGlobalWatermarkLimit( + plan: LogicalPlan, + outputMode: OutputMode, + failWhenDetected: Boolean): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { case s: Aggregate if s.isStreaming && outputMode == InternalOutputModes.Append => true - case ExtractEquiJoinKeys(joinType, _, _, _, left, right, _) + case Join(left, right, joinType, _, _) if left.isStreaming && right.isStreaming && joinType != Inner => true case f: FlatMapGroupsWithState if f.isStreaming && f.outputMode == OutputMode.Append() => true @@ -55,8 +58,7 @@ object UnsupportedOperationChecker extends Logging { def isStatefulOperation(p: LogicalPlan): Boolean = p match { case s: Aggregate if s.isStreaming => true - case _ @ ExtractEquiJoinKeys(_, _, _, _, left, right, _) - if left.isStreaming && right.isStreaming => true + case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => true case f: FlatMapGroupsWithState if f.isStreaming => true case d: Deduplicate if d.isStreaming => true case _ => false @@ -68,14 +70,21 @@ object UnsupportedOperationChecker extends Logging { subPlan.find { p => (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p) } match { - case Some(_) if !loggedWarnMessage => - logWarning("Detected pattern of possible 'correctness' issue " + + case Some(_) => + val errorMsg = "Detected pattern of possible 'correctness' issue " + "due to global watermark. " + "The query contains stateful operation which can possibly emit late rows, and " + "downstream stateful operation which drop emitted late rows. " + - "Please refer the programming guide doc for more details." + - s";\n$plan") - loggedWarnMessage = true + "Please refer the programming guide doc for more details." + + if (failWhenDetected) { + throwError(errorMsg)(plan) + } else { + if (!loggedWarnMessage) { + logWarning(s"$errorMsg;\n$plan") + loggedWarnMessage = true + } + } case _ => } @@ -381,7 +390,7 @@ object UnsupportedOperationChecker extends Logging { checkUnsupportedExpressions(subPlan) } - warnStreamingQueryGlobalWatermarkLimit(plan, outputMode) + checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false) } def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 0fe646edb340..3ec6fdeedd4b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -643,6 +643,153 @@ class UnsupportedOperationsSuite extends SparkFunSuite { null, new TestStreamingRelationV2(attribute)), OutputMode.Append()) + // streaming aggregation + { + assertPassOnGlobalWatermarkLimit( + "single streaming aggregation in Append mode", + streamRelation.groupBy("a")(count("*")), + OutputMode.Append()) + + assertFailOnGlobalWatermarkLimit( + "chained streaming aggregations in Append mode", + streamRelation.groupBy("a")(count("*")).groupBy()(count("*")), + OutputMode.Append()) + + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => + val plan = streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType = joinType) + assertFailOnGlobalWatermarkLimit( + s"$joinType join after streaming aggregation in Append mode", + streamRelation.join(streamRelation.groupBy("a")(count("*")), joinType = joinType), + OutputMode.Append()) + } + + assertFailOnGlobalWatermarkLimit( + "deduplicate after streaming aggregation in Append mode", + Deduplicate(Seq(attribute), streamRelation.groupBy("a")(count("*"))), + OutputMode.Append()) + + assertFailOnGlobalWatermarkLimit( + "FlatMapGroupsWithState after streaming aggregation in Append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, + streamRelation.groupBy("a")(count("*"))), + OutputMode.Append()) + } + + // stream-stream join + // stream-stream inner join doesn't emit late rows, whereas outer joins could + Seq((Inner, false), (LeftOuter, true), (RightOuter, true)).map { case (joinType, expectFailure) => + assertPassOnGlobalWatermarkLimit( + s"single $joinType join in Append mode", + streamRelation.join(streamRelation, joinType = RightOuter, + condition = Some(attributeWithWatermark === attribute)), + OutputMode.Append()) + + testGlobalWatermarkLimit( + s"streaming aggregation after stream-stream $joinType join in Append mode", + streamRelation.join(streamRelation, joinType = joinType, + condition = Some(attributeWithWatermark === attribute)) + .groupBy("a")(count("*")), + OutputMode.Append(), + expectFailure = expectFailure) + + Seq(Inner, LeftOuter, RightOuter).map { joinType2 => + testGlobalWatermarkLimit( + s"streaming-stream $joinType2 after stream-stream $joinType join in Append mode", + streamRelation.join( + streamRelation.join(streamRelation, joinType = joinType, + condition = Some(attributeWithWatermark === attribute)), + joinType = joinType2, + condition = Some(attributeWithWatermark === attribute)), + OutputMode.Append(), + expectFailure = expectFailure) + } + + testGlobalWatermarkLimit( + s"FlatMapGroupsWithState after stream-stream $joinType join in Append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, + streamRelation.join(streamRelation, joinType = joinType, + condition = Some(attributeWithWatermark === attribute))), + OutputMode.Append(), + expectFailure = expectFailure) + + testGlobalWatermarkLimit( + s"deduplicate after stream-stream $joinType join in Append mode", + Deduplicate(Seq(attribute), streamRelation.join(streamRelation, joinType = joinType, + condition = Some(attributeWithWatermark === attribute))), + OutputMode.Append(), + expectFailure = expectFailure) + } + + // FlatMapGroupsWithState + { + assertPassOnGlobalWatermarkLimit( + "single FlatMapGroupsWithState in Append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, streamRelation), + OutputMode.Append()) + + assertFailOnGlobalWatermarkLimit( + "streaming aggregation after FlatMapGroupsWithState in Append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")), + OutputMode.Append()) + + Seq(Inner, LeftOuter, RightOuter).map { joinType => + assertFailOnGlobalWatermarkLimit( + s"stream-stream $joinType after FlatMapGroupsWithState in Append mode", + streamRelation.join( + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, streamRelation), joinType = joinType, + condition = Some(attributeWithWatermark === attribute)), + OutputMode.Append()) + } + + assertFailOnGlobalWatermarkLimit( + "FlatMapGroupsWithState after FlatMapGroupsWithState in Append mode", + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, streamRelation)), + OutputMode.Append()) + + assertFailOnGlobalWatermarkLimit( + s"deduplicate after FlatMapGroupsWithState in Append mode", + Deduplicate(Seq(attribute), + FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, streamRelation)), + OutputMode.Append()) + } + + // deduplicate + { + assertPassOnGlobalWatermarkLimit( + "streaming aggregation after deduplicate in Append mode", + Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")), + OutputMode.Append()) + + Seq(Inner, LeftOuter, RightOuter).map { joinType => + assertPassOnGlobalWatermarkLimit( + s"$joinType join after deduplicate in Append mode", + streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType, + condition = Some(attributeWithWatermark === attribute)), + OutputMode.Append()) + } + + assertPassOnGlobalWatermarkLimit( + "FlatMapGroupsWithState after deduplicate in Append mode", + FlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Append, + isMapGroupsWithState = false, null, + Deduplicate(Seq(attribute), streamRelation)), + OutputMode.Append()) + } + /* ======================================================================================= TESTING FUNCTIONS @@ -839,6 +986,40 @@ class UnsupportedOperationsSuite extends SparkFunSuite { } } + + def assertPassOnGlobalWatermarkLimit( + testNamePostfix: String, + plan: LogicalPlan, + outputMode: OutputMode): Unit = { + testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = false) + } + + def assertFailOnGlobalWatermarkLimit( + testNamePostfix: String, + plan: LogicalPlan, + outputMode: OutputMode): Unit = { + testGlobalWatermarkLimit(testNamePostfix, plan, outputMode, expectFailure = true) + } + + def testGlobalWatermarkLimit( + testNamePostfix: String, + plan: LogicalPlan, + outputMode: OutputMode, + expectFailure: Boolean): Unit = { + test(s"Global watermark limit - $testNamePostfix") { + if (expectFailure) { + val e = intercept[AnalysisException] { + UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( + wrapInStreaming(plan), outputMode, failWhenDetected = true) + } + assert(e.message.contains("Detected pattern of possible 'correctness' issue")) + } else { + UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( + wrapInStreaming(plan), outputMode, failWhenDetected = true) + } + } + } + /** * Test whether the body of code will fail. If it does fail, then check if it has expected * messages. From 232a8fc29e0f70281380ca585a7264a6d01579cd Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 28 Sep 2019 06:28:29 +0900 Subject: [PATCH 3/8] Reflect review comments --- .../structured-streaming-programming-guide.md | 15 ++++----- .../UnsupportedOperationChecker.scala | 33 ++++++++----------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 539adc4d1f67..ffe08157f299 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1614,7 +1614,7 @@ this configuration judiciously. ### Arbitrary Stateful Operations Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). -Though Spark cannot check and force it, state function should be implemented with respect of semantic of output mode. e.g. In update mode Spark doesn't expect state function will emit rows which are older than current watermark, whereas in Append mode state function can emit these rows. +Though Spark cannot check and force it, state function should be implemented with respect to the semantics of output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark, whereas in Append mode the state function can emit these rows. ### Unsupported Operations There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. @@ -1649,21 +1649,20 @@ efficiently. ### Limitation of global watermark -In some circumstance, some stateful operations could emit rows older than current watermark (with allowed delay), +In Append mode, some stateful operations could emit rows older than current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. -This could bring correctness issue. +This is a limitation of global watermark, and it could bring correctness issue. -This is a limitation of global watermark and operator-wise watermark is not yet supported. Before Spark will support -operator-wise watermark, Spark will check the logical plan of query and log warning when Spark detects such pattern. +Spark will check the logical plan of query and log warning when Spark detects such pattern. -Any stateful operation(s) after any of below stateful operations are possibly having issue: +Any of the following stateful operation(s) after any of below stateful operations can have this issue: * streaming aggregation in Append mode * stream-stream outer join * mapGroupsWithState and flatMapGroupsWithState in Append mode (depending on the implementation of state function) -As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that state function -could emit late rows if the operator is append mode. +As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that the state function +could emit late rows if the operator uses Append mode. There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 07bbbb42be8a..8bea934a0171 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -64,31 +64,24 @@ object UnsupportedOperationChecker extends Logging { case _ => false } - var loggedWarnMessage = false - plan.foreach { subPlan => - if (isStatefulOperation(subPlan)) { - subPlan.find { p => - (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p) - } match { - case Some(_) => + try { + plan.foreach { subPlan => + if (isStatefulOperation(subPlan)) { + subPlan.find { p => + (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p) + }.foreach { val errorMsg = "Detected pattern of possible 'correctness' issue " + "due to global watermark. " + - "The query contains stateful operation which can possibly emit late rows, and " + - "downstream stateful operation which drop emitted late rows. " + + "The query contains stateful operation which can emit rows older than " + + "the current watermark plus allowed late record delay, which are \"late rows\"" + + " in downstream stateful operations and these rows can be discarded. " + "Please refer the programming guide doc for more details." - - if (failWhenDetected) { - throwError(errorMsg)(plan) - } else { - if (!loggedWarnMessage) { - logWarning(s"$errorMsg;\n$plan") - loggedWarnMessage = true - } - } - - case _ => + throwError(errorMsg)(plan) + } } } + } catch { + case e: AnalysisException if !failWhenDetected => logWarning(s"${e.message};\n$plan") } } From 4206bb362c751f8626205397cef8686ed684702a Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 28 Sep 2019 06:30:20 +0900 Subject: [PATCH 4/8] Roll back unrelated change --- docs/structured-streaming-programming-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index ffe08157f299..7a8a85c8aa31 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1503,6 +1503,7 @@ Additional details on supported joins: - Cannot use streaming aggregations before joins. + - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins. ### Streaming Deduplication You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. From bb24dcb1e56178b77a0028518aa08b153d39e894 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 28 Sep 2019 06:31:25 +0900 Subject: [PATCH 5/8] More refines.. --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 7a8a85c8aa31..931ec22948cd 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1615,7 +1615,7 @@ this configuration judiciously. ### Arbitrary Stateful Operations Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). -Though Spark cannot check and force it, state function should be implemented with respect to the semantics of output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark, whereas in Append mode the state function can emit these rows. +Though Spark cannot check and force it, state function should be implemented with respect to the semantics of output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows. ### Unsupported Operations There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. From b6c7f7b43ad514b3b9e890c5eac97a61c3ac2cf6 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 28 Sep 2019 06:32:20 +0900 Subject: [PATCH 6/8] Address missing one --- docs/structured-streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 931ec22948cd..df82c06b3c9a 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1660,9 +1660,9 @@ Any of the following stateful operation(s) after any of below stateful operation * streaming aggregation in Append mode * stream-stream outer join -* mapGroupsWithState and flatMapGroupsWithState in Append mode (depending on the implementation of state function) +* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of state function) -As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that the state function +As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function could emit late rows if the operator uses Append mode. There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure From 57f1cecbf770be05ddfb7034cb6f3f1afe3633f3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 28 Sep 2019 09:42:03 +0900 Subject: [PATCH 7/8] Fix silly one --- .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 8bea934a0171..2f8cb26ffaa9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -69,7 +69,7 @@ object UnsupportedOperationChecker extends Logging { if (isStatefulOperation(subPlan)) { subPlan.find { p => (p ne subPlan) && isStatefulOperationPossiblyEmitLateRows(p) - }.foreach { + }.foreach { _ => val errorMsg = "Detected pattern of possible 'correctness' issue " + "due to global watermark. " + "The query contains stateful operation which can emit rows older than " + From d2d511ecc8daa307549d8013b24ff31c2a3045fc Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sun, 29 Sep 2019 08:25:11 +0900 Subject: [PATCH 8/8] Reflect review comments --- docs/structured-streaming-programming-guide.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index df82c06b3c9a..2a405f36fd5f 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1615,7 +1615,7 @@ this configuration judiciously. ### Arbitrary Stateful Operations Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)). -Though Spark cannot check and force it, state function should be implemented with respect to the semantics of output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows. +Though Spark cannot check and force it, the state function should be implemented with respect to the semantics of the output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows. ### Unsupported Operations There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. @@ -1650,20 +1650,20 @@ efficiently. ### Limitation of global watermark -In Append mode, some stateful operations could emit rows older than current watermark plus allowed late record delay, -which are "late rows" in downstream stateful operations (as Spark uses global watermark) and these rows can be discarded. -This is a limitation of global watermark, and it could bring correctness issue. +In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay, +they will be "late rows" in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded. +This is a limitation of a global watermark, and it could potentially cause a correctness issue. -Spark will check the logical plan of query and log warning when Spark detects such pattern. +Spark will check the logical plan of query and log a warning when Spark detects such a pattern. -Any of the following stateful operation(s) after any of below stateful operations can have this issue: +Any of the stateful operation(s) after any of below stateful operations can have this issue: * streaming aggregation in Append mode * stream-stream outer join -* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of state function) +* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of the state function) As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function -could emit late rows if the operator uses Append mode. +emits late rows if the operator uses Append mode. There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.