Skip to content
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6554,6 +6554,11 @@
"State TTL with <stateStoreProvider> is not supported. Please use RocksDBStateStoreProvider."
]
},
"STORE_BACKEND_NOT_SUPPORTED_FOR_TWS" : {
"message" : [
"Store backend <stateStoreProvider> is not supported by TransformWithState operator. Please use RocksDBStateStoreProvider."
]
},
"TABLE_OPERATION" : {
"message" : [
"Table <tableName> does not support <operation>. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ case class TransformWithStateInPySparkExec(
*/
override protected def doExecute(): RDD[InternalRow] = {
metrics
validateStateStoreProvider()

if (!hasInitialState) {
if (isStreaming) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ case class TransformWithStateExec(
metrics // force lazy init at driver

validateTimeMode()
validateStateStoreProvider()

if (hasInitialState) {
val storeConf = new StateStoreConf(session.sessionState.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorCustomMetric, StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning, StateStoreWriter, WatermarkSupport}
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, TransformWithStateUserFunctionException}
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, RocksDBStateStoreProvider, StateStoreErrors, TransformWithStateUserFunctionException}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
import org.apache.spark.sql.types.{BinaryType, StructType}
import org.apache.spark.util.NextIterator
Expand All @@ -51,6 +52,12 @@ abstract class TransformWithStateExecBase(
with WatermarkSupport
with TransformWithStateMetadataUtils {

// Supported state store providers for TransformWithState.
// TransformWithState currently supports only RocksDBStateStoreProvider.
private val SUPPORTED_STATE_STORE_PROVIDERS = Set(
classOf[RocksDBStateStoreProvider].getName
)

override def operatorStateMetadataVersion: Int = 2

override def supportsSchemaEvolution: Boolean = true
Expand Down Expand Up @@ -216,6 +223,14 @@ abstract class TransformWithStateExecBase(
}
}

/** Validates that the configured state store provider is supported by TransformWithState. */
protected def validateStateStoreProvider(): Unit = {
val providerName = conf.getConf(SQLConf.STATE_STORE_PROVIDER_CLASS)
if (!SUPPORTED_STATE_STORE_PROVIDERS.contains(providerName)) {
throw StateStoreErrors.storeBackendNotSupportedForTWS(providerName)
}
}

/**
* Executes a block of code with standardized error handling for StatefulProcessor operations.
* Rethrows SparkThrowables directly and wraps other exceptions in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ object StateStoreErrors {
new StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
}

def storeBackendNotSupportedForTWS(stateStoreProvider: String):
StateStoreBackendNotSupportedForTWSException = {
new StateStoreBackendNotSupportedForTWSException(stateStoreProvider)
}

def cannotUseColumnFamilyWithInvalidName(operationName: String, colFamilyName: String):
StateStoreCannotUseColumnFamilyWithInvalidName = {
new StateStoreCannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
Expand Down Expand Up @@ -313,6 +318,11 @@ class StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider:
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
messageParameters = Map("stateStoreProvider" -> stateStoreProvider))

class StateStoreBackendNotSupportedForTWSException(stateStoreProvider: String)
extends SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS",
messageParameters = Map("stateStoreProvider" -> stateStoreProvider))

class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String, colFamilyName: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2570,7 +2570,7 @@ class TransformWithStateValidationSuite extends StateStoreMetricsTest {

testStream(result, OutputMode.Update())(
AddData(inputData, "a"),
ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] { t =>
ExpectFailure[StateStoreBackendNotSupportedForTWSException] { t =>
assert(t.getMessage.contains("not supported"))
}
)
Expand Down Expand Up @@ -2836,7 +2836,7 @@ class TransformWithStateValidationSuite extends StateStoreMetricsTest {
)
testStream(result, OutputMode.Update())(
AddData(inputData, InitInputRow("a", "add", -1.0)),
ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] {
ExpectFailure[StateStoreBackendNotSupportedForTWSException] {
(t: Throwable) => {
assert(t.getMessage.contains("not supported"))
}
Expand Down