diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index f1a648176c3b3..d097f9f18f89b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.internal import java.net.URL -import java.util.Locale +import java.util.{Locale, UUID} +import java.util.concurrent.ConcurrentHashMap import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils @@ -110,6 +112,12 @@ private[sql] class SharedState( */ val cacheManager: CacheManager = new CacheManager + /** + * A map of active streaming queries to the session specific StreamingQueryManager that manages + * the lifecycle of that stream. + */ + private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]() + /** * A status store to query SQL status/metrics of this Spark application, based on SQL-specific * [[org.apache.spark.scheduler.SparkListenerEvent]]s. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 9abe38dfda0be..9b43a83e7b94a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -352,8 +352,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } } - // Make sure no other query with same id is active - if (activeQueries.values.exists(_.id == query.id)) { + // Make sure no other query with same id is active across all sessions + val activeOption = + Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) + if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + s"already active. Perhaps you are attempting to restart a query from checkpoint " + @@ -370,9 +372,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo query.streamingQuery.start() } catch { case e: Throwable => - activeQueriesLock.synchronized { - activeQueries -= query.id - } + unregisterTerminatedStream(query.id) throw e } query @@ -380,9 +380,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo /** Notify (by the StreamingQuery) that the query has been terminated */ private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = { - activeQueriesLock.synchronized { - activeQueries -= terminatedQuery.id - } + unregisterTerminatedStream(terminatedQuery.id) awaitTerminationLock.synchronized { if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { lastTerminatedQuery = terminatedQuery @@ -391,4 +389,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } stateStoreCoordinator.deactivateInstances(terminatedQuery.runId) } + + private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = { + activeQueriesLock.synchronized { + // remove from shared state only if the streaming query manager also matches + sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this) + activeQueries -= terminatedQueryId + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index b26d2556b2e36..09580b94056b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming +import java.io.File import java.util.concurrent.CountDownLatch import scala.concurrent.Future @@ -28,7 +29,7 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{Dataset, Encoders} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.util.BlockingSource @@ -242,6 +243,83 @@ class StreamingQueryManagerSuite extends StreamTest { } } + testQuietly("can't start a streaming query with the same name in the same session") { + val ds1 = makeDataset._2 + val ds2 = makeDataset._2 + val queryName = "abc" + + val query1 = ds1.writeStream.format("noop").queryName(queryName).start() + try { + val e = intercept[IllegalArgumentException] { + ds2.writeStream.format("noop").queryName(queryName).start() + } + assert(e.getMessage.contains("query with that name is already active")) + } finally { + query1.stop() + } + } + + testQuietly("can start a streaming query with the same name in a different session") { + val session2 = spark.cloneSession() + + val ds1 = MemoryStream(Encoders.INT, spark.sqlContext).toDS() + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val queryName = "abc" + + val query1 = ds1.writeStream.format("noop").queryName(queryName).start() + val query2 = ds2.writeStream.format("noop").queryName(queryName).start() + + query1.stop() + query2.stop() + } + + testQuietly("can't start multiple instances of the same streaming query in the same session") { + withTempDir { dir => + val (ms1, ds1) = makeDataset + val (ms2, ds2) = makeDataset + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ds1.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + try { + val e = intercept[IllegalStateException] { + ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + } + assert(e.getMessage.contains("same id")) + } finally { + query1.stop() + } + } + } + + testQuietly( + "can't start multiple instances of the same streaming query in the different sessions") { + withTempDir { dir => + val session2 = spark.cloneSession() + + val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ms1.toDS().writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + try { + val e = intercept[IllegalStateException] { + ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + } + assert(e.getMessage.contains("same id")) + } finally { + query1.stop() + } + } + } + /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { failAfter(streamingTimeout) {