Skip to content

Commit a660152

Browse files
committed
tests compile and pass
1 parent 1cdc3f9 commit a660152

File tree

8 files changed

+76
-15
lines changed

8 files changed

+76
-15
lines changed

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ import org.scalatest.matchers.should._
3636
import org.scalatest.time.SpanSugar._
3737

3838
import org.apache.spark.{SparkException, TestUtils}
39-
import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter, Row, SparkSession}
39+
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
4040
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
4141
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
4242
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation
4343
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
4444
import org.apache.spark.sql.execution.streaming._
45-
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetSeqBase, TombstoneOffset}
45+
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqBase
4646
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
4747
import org.apache.spark.sql.execution.streaming.runtime.{MicroBatchExecution, StreamExecution, StreamingExecutionRelation}
4848
import org.apache.spark.sql.execution.streaming.runtime.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3026,7 +3026,6 @@ object SQLConf {
30263026
"source evolution capability where sources can be added, removed, or reordered " +
30273027
"without losing checkpoint state.")
30283028
.version("4.2.0")
3029-
.owner("streaming-engine")
30303029
.booleanConf
30313030
.createWithDefault(false)
30323031

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
305305
userSpecifiedSchema = Some(table.schema),
306306
options = dsOptions,
307307
catalogTable = Some(table))
308-
StreamingRelation(dataSource)
308+
// TODO: [SC-209298] Add API for naming source in ST
309+
StreamingRelation(dataSource, None)
309310
}
310311

311312

@@ -335,7 +336,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
335336
result
336337

337338
case s @ StreamingRelationV2(
338-
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
339+
_, _, table, extraOptions, _, _, _, Some(
340+
UnresolvedCatalogRelation(tableMeta, _, true)), _) =>
339341
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
340342
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
341343
if (table.isInstanceOf[SupportsRead]

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class ContinuousExecution(
7979
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
8080
val _logicalPlan = analyzedPlan.transform {
8181
case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output,
82-
catalog, identifier, _) =>
82+
catalog, identifier, _, userProvidedName) =>
8383
val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else ""
8484
if (!table.supports(TableCapability.CONTINUOUS_READ)) {
8585
throw QueryExecutionErrors.continuousProcessingUnsupportedByDataSourceError(sourceName)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ class MicroBatchExecution(
184184

185185
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
186186
val _logicalPlan = analyzedPlan.transform {
187-
case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, output) =>
187+
case streamingRelation @ StreamingRelation(
188+
dataSourceV1, sourceName, output, userProvidedSourceName) =>
188189
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
189190
// Materialize source to avoid creating it in every batch
190191
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -193,11 +194,12 @@ class MicroBatchExecution(
193194
logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] " +
194195
log"from DataSourceV1 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
195196
log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dataSourceV1)}]")
196-
StreamingExecutionRelation(source, output, dataSourceV1.catalogTable)(sparkSession)
197+
StreamingExecutionRelation(
198+
source, output, dataSourceV1.catalogTable, userProvidedSourceName)(sparkSession)
197199
})
198200

199201
case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output,
200-
catalog, identifier, v1) =>
202+
catalog, identifier, v1, userProvidedSourceName) =>
201203
val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
202204
val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
203205
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
@@ -221,7 +223,8 @@ class MicroBatchExecution(
221223
trigger match {
222224
case RealTimeTrigger(duration) => Some(duration)
223225
case _ => None
224-
}
226+
},
227+
userProvidedSourceName
225228
)
226229
StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
227230
})
@@ -240,7 +243,7 @@ class MicroBatchExecution(
240243
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
241244
// We don't have a catalog table but may have a table identifier. Given this is about
242245
// v1 fallback path, we just give up and set the catalog table as None.
243-
StreamingExecutionRelation(source, output, None)(sparkSession)
246+
StreamingExecutionRelation(source, output, None, userProvidedSourceName)(sparkSession)
244247
})
245248
}
246249
}
@@ -932,7 +935,7 @@ class MicroBatchExecution(
932935
// Replace sources in the logical plan with data that has arrived since the last batch.
933936
val newBatchesPlan = logicalPlan transform {
934937
// For v1 sources.
935-
case StreamingExecutionRelation(source, output, catalogTable) =>
938+
case StreamingExecutionRelation(source, output, catalogTable, _) =>
936939
mutableNewData.get(source).map { dataPlan =>
937940
val hasFileMetadata = output.exists {
938941
case FileSourceMetadataAttribute(_) => true

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
4848
log"is not supported in streaming DataFrames/Datasets and will be disabled.")
4949
}
5050

51+
// Always check for duplicate source names
52+
checkDuplicateSourceNames(s.inputQuery)
53+
54+
// Check for unnamed sources when enforcement is enabled
55+
if (conf.enableStreamingSourceEvolution) {
56+
validateNamedSources(s.inputQuery)
57+
}
58+
5159
if (conf.isUnsupportedOperationCheckEnabled) {
5260
if (s.trigger.isInstanceOf[RealTimeTrigger]) {
5361
UnsupportedOperationChecker.
@@ -143,5 +151,48 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
143151
log"resolved to ${MDC(CHECKPOINT_ROOT, resolvedCheckpointRoot)}.")
144152
(resolvedCheckpointRoot, deleteCheckpointOnStop)
145153
}
154+
155+
/**
156+
* Checks for duplicate source names across all streaming sources.
157+
* This validation always runs regardless of source evolution enforcement.
158+
*/
159+
private def checkDuplicateSourceNames(plan: LogicalPlan): Unit = {
160+
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
161+
162+
val sourcesWithNames = plan.collect {
163+
case StreamingRelation(_, _, _, userProvidedSourceName) => userProvidedSourceName
164+
case StreamingRelationV2(
165+
_, _, _, _, _, _, _, _, userProvidedSourceName) => userProvidedSourceName
166+
}
167+
168+
// Check for duplicate source names among named sources
169+
val namedSources = sourcesWithNames.flatten
170+
val duplicates = namedSources.groupBy(identity).filter(_._2.size > 1).keys.toSeq.sorted
171+
172+
if (duplicates.nonEmpty) {
173+
throw QueryCompilationErrors.duplicateStreamingSourceNamesError(
174+
duplicates.map(name => s"'$name'").mkString(", "))
175+
}
176+
}
177+
178+
/**
179+
* Validates that all streaming sources have names when named source enforcement is enabled.
180+
*/
181+
private def validateNamedSources(plan: LogicalPlan): Unit = {
182+
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
183+
184+
val unnamedSources = plan.collect {
185+
case StreamingRelation(ds, sourceName, _, None) =>
186+
s"[index=${sourceName}, provider=${ds.providingClass.getSimpleName}]"
187+
case StreamingRelationV2(src, srcName, table, _, _, _, _, _, None) =>
188+
val provider = src.map(_.getClass.getSimpleName).getOrElse(table.getClass.getSimpleName)
189+
s"[index=${srcName}, provider=${provider}]"
190+
}
191+
192+
if (unnamedSources.nonEmpty) {
193+
throw QueryCompilationErrors.unnamedStreamingSourcesWithEnforcementError(
194+
unnamedSources.mkString(", "))
195+
}
196+
}
146197
}
147198

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ object StreamingRelation {
3737
StreamingRelation(
3838
dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema))
3939
}
40+
41+
def apply(dataSource: DataSource, userProvidedSourceName: Option[String]): StreamingRelation = {
42+
StreamingRelation(
43+
dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema),
44+
userProvidedSourceName)
45+
}
4046
}
4147

4248
/**

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,15 @@ abstract class FileStreamSourceTest
190190
protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
191191
val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
192192
df.queryExecution.analyzed
193-
.collect { case StreamingRelation(dataSource, _, _) =>
193+
.collect { case StreamingRelation(dataSource, _, _, _) =>
194194
// There is only one source in our tests so just set sourceId to 0
195195
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
196196
}.head
197197
}
198198

199199
protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = {
200200
query.logicalPlan.collect {
201-
case StreamingExecutionRelation(source, _, _) if source.isInstanceOf[FileStreamSource] =>
201+
case StreamingExecutionRelation(source, _, _, _) if source.isInstanceOf[FileStreamSource] =>
202202
source.asInstanceOf[FileStreamSource]
203203
}
204204
}
@@ -251,7 +251,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
251251
reader.load()
252252
}
253253
df.queryExecution.analyzed
254-
.collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
254+
.collect { case s @ StreamingRelation(dataSource, _, _, _) => s.schema }.head
255255
}
256256

257257
override def beforeAll(): Unit = {

0 commit comments

Comments
 (0)