Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ Note that this migration guide describes the items specific to Structured Stream
Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).

## Upgrading from Structured Streaming 3.0 to 3.1

- In Spark 3.0 and before, for the queries that have stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded, Spark only prints a warning message. Since Spark 3.1, Spark will check for such queries with possible correctness issue and throw AnalysisException for it by default. For the users who understand the possible risk of correctness issue and still decide to run the query, please disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.

## Upgrading from Structured Streaming 2.4 to 3.0

- In Spark 3.0, Structured Streaming forces the source schema into nullable when file-based datasources such as text, json, csv, parquet and orc are used via `spark.readStream(...)`. Previously, it respected the nullability in source schema; however, it caused issues tricky to debug with NPE. To restore the previous behavior, set `spark.sql.streaming.fileSource.schema.forceNullable` to `false`.

- Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (See [SPARK-26154](https://issues.apache.org/jira/browse/SPARK-26154) for more details). If you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join, Spark 3.0 fails the query. To recalculate outputs, discard the checkpoint and replay previous inputs.

- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`.
- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode

/**
Expand All @@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
}
}

/**
* Checks for possible correctness issue in chained stateful operators. The behavior is
* controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just
* print a warning message.
*/
def checkStreamingQueryGlobalWatermarkLimit(
plan: LogicalPlan,
outputMode: OutputMode,
failWhenDetected: Boolean): Unit = {
outputMode: OutputMode): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
case s: Aggregate
if s.isStreaming && outputMode == InternalOutputModes.Append => true
Expand All @@ -62,6 +68,8 @@ object UnsupportedOperationChecker extends Logging {
case _ => false
}

val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled

try {
plan.foreach { subPlan =>
if (isStatefulOperation(subPlan)) {
Expand All @@ -73,7 +81,10 @@ object UnsupportedOperationChecker extends Logging {
"The query contains stateful operation which can emit rows older than " +
"the current watermark plus allowed late record delay, which are \"late rows\"" +
" in downstream stateful operations and these rows can be discarded. " +
"Please refer the programming guide doc for more details."
"Please refer the programming guide doc for more details. If you understand " +
"the possible risk of correctness issue and still need to run the query, " +
"you can disable this check by setting the config " +
"`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false."
throwError(errorMsg)(plan)
}
}
Expand Down Expand Up @@ -388,7 +399,7 @@ object UnsupportedOperationChecker extends Logging {
checkUnsupportedExpressions(subPlan)
}

checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false)
checkStreamingQueryGlobalWatermarkLimit(plan, outputMode)
}

def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,21 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED =
buildConf("spark.sql.streaming.statefulOperator.checkCorrectness.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the naming, cc @cloud-fan .

.internal()
.doc("When true, the stateful operators for streaming query will be checked for possible " +
"correctness issue due to global watermark. The correctness issue comes from queries " +
"containing stateful operation which can emit rows older than the current watermark " +
"plus allowed late record delay, which are \"late rows\" in downstream stateful " +
"operations and these rows can be discarded. Please refer the programming guide doc for " +
"more details. Once the issue is detected, Spark will throw analysis exception. " +
"When this config is disabled, Spark will just print warning message for users. " +
"Prior to Spark 3.1.0, the behavior is disabling this config.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)

val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Expand Down Expand Up @@ -3028,6 +3043,9 @@ class SQLConf extends Serializable with Logging {

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def statefulOperatorCorrectnessCheckEnabled: Boolean =
getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)

def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder}
import org.apache.spark.unsafe.types.CalendarInterval

/** A dummy command for testing unsupported operations. */
case class DummyCommand() extends Command

class UnsupportedOperationsSuite extends SparkFunSuite {
class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {

val attribute = AttributeReference("a", IntegerType, nullable = true)()
val watermarkMetadata = new MetadataBuilder()
Expand Down Expand Up @@ -218,6 +219,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update"))

// FlatMapGroupsWithState(Append) in streaming with aggregation
// Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled.
for (outputMode <- Seq(Append, Update, Complete)) {
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
Expand All @@ -228,7 +230,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
streamRelation)),
outputMode = outputMode)
outputMode = outputMode,
SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false")
}

for (outputMode <- Seq(Append, Update)) {
Expand Down Expand Up @@ -268,14 +271,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
}

// multiple FlatMapGroupsWithStates
// Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled.
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " +
"in append mode",
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation)),
outputMode = Append)
outputMode = Append,
SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false")

assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" +
Expand Down Expand Up @@ -995,9 +1000,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
def assertSupportedInStreamingPlan(
name: String,
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
outputMode: OutputMode,
configs: (String, String)*): Unit = {
test(s"streaming plan - $name: supported") {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
withSQLConf(configs: _*) {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
}
}
}

Expand Down Expand Up @@ -1070,14 +1078,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
expectFailure: Boolean): Unit = {
test(s"Global watermark limit - $testNamePostfix") {
if (expectFailure) {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "true") {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode)
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
} else {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
def testWithAllStateVersions(name: String)(func: => Unit): Unit = {
for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) {
test(s"$name - state format version $version") {
withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString) {
withSQLConf(
SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString,
SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") {
func
}
}
Expand Down