Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5940,6 +5940,44 @@
],
"sqlState" : "42K03"
},
"STREAMING_QUERY_EVOLUTION_ERROR" : {
"message" : [
"Streaming query evolution error:"
],
"subClass" : {
"DUPLICATE_SOURCE_NAMES" : {
"message" : [
"Duplicate streaming source names detected: <duplicateNames>. Each streaming source must have a unique name. Please ensure all sources have distinct names using the name() method."
]
},
"INVALID_SOURCE_NAME" : {
"message" : [
"Invalid streaming source name: '<sourceName>'. Source names must only contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores ('_'). Use backticks to quote names with special characters if needed."
]
},
"NAMED_SOURCES_REQUIRE_ENFORCEMENT" : {
"message" : [
"Cannot use the name() method to name streaming sources when spark.sql.streaming.queryEvolution.enableSourceEvolution is disabled. Please enable source evolution by setting spark.sql.streaming.queryEvolution.enableSourceEvolution to true, or remove the name() call."
]
},
"NAMED_SOURCES_REQUIRE_OFFSET_LOG_V2" : {
"message" : [
"Named streaming sources enforcement requires offset log format V2 (OffsetMap), but found V<version>. V2 format uses sourceId:offset pairs which support source evolution. Set spark.sql.streaming.offsetLog.version to 2, or disable named sources enforcement by setting spark.sql.streaming.queryEvolution.enableSourceEvolution to false"
]
},
"TOMBSTONE_SOURCE_NAME_REUSE" : {
"message" : [
"Cannot reuse tombstoned source names: <sourceNames>. These source names were previously used and then removed from the streaming query at checkpoint location <checkpointLocation>. Reusing tombstoned source names can lead to data correctness issues. Please use different source names."
]
},
"UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : {
"message" : [
"All streaming sources must be named when spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed sources found: <sourceInfo>. Use the name() method to assign names to all streaming sources."
]
}
},
"sqlState" : "42000"
},
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
"message" : [
"Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with K

val sources: Seq[SparkDataStream] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _, _) => source
case StreamingExecutionRelation(source: KafkaSource, _, _, _) => source
case r: StreamingDataSourceV2ScanRelation
if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
r.stream.isInstanceOf[KafkaContinuousStream] =>
Expand Down Expand Up @@ -1615,7 +1615,7 @@ abstract class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBa
makeSureGetOffsetCalled,
AssertOnQuery { query =>
query.logicalPlan.collectFirst {
case StreamingExecutionRelation(_: KafkaSource, _, _) => true
case StreamingExecutionRelation(_: KafkaSource, _, _, _) => true
}.nonEmpty
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ abstract class DataStreamReader {
*/
def format(source: String): this.type

/**
* Assigns a name to this streaming source for source evolution capability.
* When sources are named, they can be added, removed, or reordered without
* losing checkpoint state, enabling query evolution.
*
* If not specified, sources are automatically assigned ordinal names ("0", "1", "2", etc.)
* based on their position in the query, which maintains backward compatibility.
*
* @param sourceName The unique name for this source (alphanumeric, underscore, hyphen only)
* @since 4.1.0
*/
private[sql] def name(sourceName: String): this.type

/**
* Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data source can skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ case class StreamingRelationV2(
output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
v1Relation: Option[LogicalPlan])
v1Relation: Option[LogicalPlan],
userProvidedSourceName: Option[String] = None)
extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
override lazy val resolved = v1Relation.forall(_.resolved)
override def isStreaming: Boolean = true
Expand All @@ -59,7 +60,7 @@ case class StreamingRelationV2(
val newMetadata = metadataOutput.filterNot(outputSet.contains)
if (newMetadata.nonEmpty) {
StreamingRelationV2(source, sourceName, table, extraOptions,
output ++ newMetadata, catalog, identifier, v1Relation)
output ++ newMetadata, catalog, identifier, v1Relation, userProvidedSourceName)
} else {
this
}
Expand All @@ -69,5 +70,6 @@ case class StreamingRelationV2(
sizeInBytes = BigInt(conf.defaultSizeInBytes)
)

override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))
override def newInstance(): LogicalPlan = this.copy(
output = output.map(_.newInstance()), userProvidedSourceName = userProvidedSourceName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4492,4 +4492,54 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"colType" -> "metadata",
"errors" -> errors.mkString("- ", "\n- ", "")))
}

/**
* Error thrown when streaming source evolution enforcement is enabled but some sources are
* unnamed. Source evolution requires all sources to have explicit names to track them across
* query restarts.
*
* @param sourceInfo formatted string containing information about unnamed sources,
* including their index positions and provider names
* (e.g., "[index=0, provider=kafka], [index=2, provider=delta]")
*/
def unnamedStreamingSourcesWithEnforcementError(sourceInfo: String): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT",
messageParameters = Map("sourceInfo" -> sourceInfo))
}

def namedSourcesRequireOffsetLogV2Error(version: Int): Throwable = {
new SparkException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.NAMED_SOURCES_REQUIRE_OFFSET_LOG_V2",
messageParameters = Map("version" -> version.toString),
cause = null)
}

def duplicateStreamingSourceNamesError(duplicateNames: String): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
messageParameters = Map("duplicateNames" -> duplicateNames))
}

def invalidStreamingSourceNameError(sourceName: String): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
messageParameters = Map("sourceName" -> sourceName))
}

def namedSourcesRequireEnforcementError(): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.NAMED_SOURCES_REQUIRE_ENFORCEMENT",
messageParameters = Map.empty)
}

def tombstoneSourceNameReuseError(
sourceNames: Seq[String],
checkpointLocation: String): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.TOMBSTONE_SOURCE_NAME_REUSE",
messageParameters = Map(
"sourceNames" -> sourceNames.mkString(", "),
"checkpointLocation" -> checkpointLocation))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ case class StreamingDataSourceV2Relation(
identifier: Option[Identifier],
options: CaseInsensitiveStringMap,
metadataPath: String,
realTimeModeDuration: Option[Long] = None)
realTimeModeDuration: Option[Long] = None,
userProvidedSourceName: Option[String] = None)
extends DataSourceV2RelationBase(table, output, catalog, identifier, options) {

override def isStreaming: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,19 @@ object SQLConf {
.checkValue(v => Set(1).contains(v), "Valid version is 1")
.createWithDefault(1)

val STREAMING_OFFSET_LOG_FORMAT_VERSION =
buildConf("spark.sql.streaming.offsetLog.formatVersion")
.doc("Offset log format version used in streaming query checkpoints. " +
"Version 1 uses sequence-based OffsetSeq format, version 2 uses map-based OffsetMap " +
"format which provides better flexibility for source management. " +
"Offset log format versions are incompatible, so this configuration only affects new " +
"streaming queries. Existing queries will continue using the format version from their " +
"checkpoint.")
.version("4.1.0")
.intConf
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createWithDefault(1)

val UNSUPPORTED_OPERATION_CHECK_ENABLED =
buildConf("spark.sql.streaming.unsupportedOperationCheck")
.internal()
Expand All @@ -3006,6 +3019,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ENABLE_STREAMING_SOURCE_EVOLUTION =
buildConf("spark.sql.streaming.queryEvolution.enableSourceEvolution")
.internal()
.doc("When true, all streaming sources must be named using the name() API. This enables " +
"source evolution capability where sources can be added, removed, or reordered " +
"without losing checkpoint state.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

val USE_DEPRECATED_KAFKA_OFFSET_FETCHING =
buildConf("spark.sql.streaming.kafka.useDeprecatedOffsetFetching")
.internal()
Expand Down Expand Up @@ -6980,6 +7003,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def enableStreamingSourceEvolution: Boolean = getConf(ENABLE_STREAMING_SOURCE_EVOLUTION)

def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING)

def statefulOperatorCorrectnessCheckEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession)
this
}

// TODO: Add support for naming sources in SparkConnect
private[sql] def name(name: String): this.type = {
throw new UnsupportedOperationException("Naming sources not supported on Spark Connect")
}

/** @inheritdoc */
def option(key: String, value: String): this.type = {
sourceBuilder.putOptions(key, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, false, paths) =>
case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, false, paths, _) =>
// Batch data source created from DataFrameReader
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
Expand All @@ -60,7 +60,8 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
source, paths: _*)
}.getOrElse(loadV1BatchSource(source, userSpecifiedSchema, extraOptions, paths: _*))

case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, paths) =>
case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, paths,
userProvidedSourceName) =>
// Streaming data source created from DataStreamReader
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
Expand All @@ -83,7 +84,8 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
className = source,
options = optionsWithPath.originalMap)
val v1Relation = ds match {
case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
case _: StreamSourceProvider =>
Some(StreamingRelation(v1DataSource, userProvidedSourceName))
case _ => None
}
ds match {
Expand All @@ -107,16 +109,17 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
StreamingRelationV2(
Some(provider), source, table, dsOptions,
toAttributes(table.columns.asSchema), None, None, v1Relation)
toAttributes(table.columns.asSchema),
None, None, v1Relation, userProvidedSourceName)

// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
case _ => StreamingRelation(v1DataSource)
case _ => StreamingRelation(v1DataSource, userProvidedSourceName)
}

case _ =>
// Code path for data source v1.
StreamingRelation(v1DataSource)
StreamingRelation(v1DataSource, userProvidedSourceName)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ case class UnresolvedDataSource(
userSpecifiedSchema: Option[StructType],
options: CaseInsensitiveMap[String],
override val isStreaming: Boolean,
paths: Seq[String])
paths: Seq[String],
userProvidedSourceName: Option[String] = None)
extends UnresolvedLeafNode {

override def simpleString(maxFields: Int): String = toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)
this
}

/** @inheritdoc */
private[sql] def name(sourceName: String): this.type = {
// Enforce that source evolution must be enabled when naming sources
if (!sparkSession.sessionState.conf.enableStreamingSourceEvolution) {
throw QueryCompilationErrors.namedSourcesRequireEnforcementError()
}
DataStreamReader.validateSourceName(sourceName)
this.userProvidedSourceName = Some(sourceName)
this
}

/** @inheritdoc */
def option(key: String, value: String): this.type = {
this.extraOptions += (key -> value)
Expand All @@ -76,7 +87,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)
userSpecifiedSchema,
extraOptions,
isStreaming = true,
path.toSeq
path.toSeq,
userProvidedSourceName
)
Dataset.ofRows(sparkSession, unresolved)
}
Expand Down Expand Up @@ -161,4 +173,24 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)
private var userSpecifiedSchema: Option[StructType] = None

private var extraOptions = CaseInsensitiveMap[String](Map.empty)

private var userProvidedSourceName: Option[String] = None
}

object DataStreamReader {
/**
* Validates that a source name only contains valid characters.
* Source names must only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_).
* This matches Spark's identifier naming rules and ensures compatibility with filesystems.
*
* @param sourceName the source name to validate
* @throws AnalysisException if the source name contains invalid characters
*/
private[sql] def validateSourceName(sourceName: String): Unit = {
// Source names must only contain ASCII letters, digits, and underscores
val validSourceNamePattern = "^[a-zA-Z0-9_]+$".r
if (!validSourceNamePattern.pattern.matcher(sourceName).matches()) {
throw QueryCompilationErrors.invalidStreamingSourceNameError(sourceName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
userSpecifiedSchema = Some(table.schema),
options = dsOptions,
catalogTable = Some(table))
StreamingRelation(dataSource)
// TODO: [SC-209298] Add API for naming source in ST
StreamingRelation(dataSource, None)
}


Expand Down Expand Up @@ -335,7 +336,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
result

case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
_, _, table, extraOptions, _, _, _, Some(
UnresolvedCatalogRelation(tableMeta, _, true)), _) =>
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
if (table.isInstanceOf[SupportsRead]
Expand Down
Loading