diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b704f4875f82..8a3fc2e38dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1094,6 +1094,15 @@ object SQLConf { .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") .createWithDefault(2) + val STREAMING_STOP_ACTIVE_RUN_ON_RESTART = + buildConf("spark.sql.streaming.stopActiveRunOnRestart") + .doc("Running multiple runs of the same streaming query concurrently is not supported. " + + "If we find a concurrent active run for a streaming query (in the same or different " + + "SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " + + "query run to start the new one.") + .booleanConf + .createWithDefault(true) + val STREAMING_JOIN_STATE_FORMAT_VERSION = buildConf("spark.sql.streaming.join.stateFormatVersion") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d097f9f18f89..b810bedac471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.net.URL import java.util.{Locale, UUID} import java.util.concurrent.ConcurrentHashMap +import javax.annotation.concurrent.GuardedBy import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -32,9 +33,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager +import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.sql.streaming.StreamingQueryManager +import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils @@ -112,11 +114,15 @@ private[sql] class SharedState( */ val cacheManager: CacheManager = new CacheManager + /** A global lock for all streaming query lifecycle tracking and management. */ + private[sql] val activeQueriesLock = new Object + /** * A map of active streaming queries to the session specific StreamingQueryManager that manages * the lifecycle of that stream. */ - private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]() + @GuardedBy("activeQueriesLock") + private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamExecution]() /** * A status store to query SQL status/metrics of this Spark application, based on SQL-specific diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 9b43a83e7b94..e64f67cc755f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.streaming -import java.util.UUID +import java.util.{ConcurrentModificationException, UUID} import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. @@ -51,9 +51,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) - @GuardedBy("activeQueriesLock") + @GuardedBy("activeQueriesSharedLock") private val activeQueries = new mutable.HashMap[UUID, StreamingQuery] - private val activeQueriesLock = new Object + // A global lock to keep track of active streaming queries across Spark sessions + private val activeQueriesSharedLock = sparkSession.sharedState.activeQueriesLock private val awaitTerminationLock = new Object @GuardedBy("awaitTerminationLock") @@ -77,7 +78,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - def active: Array[StreamingQuery] = activeQueriesLock.synchronized { + def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized { activeQueries.values.toArray } @@ -86,7 +87,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * * @since 2.1.0 */ - def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized { + def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized { activeQueries.get(id).orNull } @@ -343,27 +344,61 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo trigger, triggerClock) - activeQueriesLock.synchronized { + // The following code block checks if a stream with the same name or id is running. Then it + // returns an Option of an already active stream to stop outside of the lock + // to avoid a deadlock. + val activeRunOpt = activeQueriesSharedLock.synchronized { // Make sure no other query with same name is active userSpecifiedName.foreach { name => if (activeQueries.values.exists(_.name == name)) { - throw new IllegalArgumentException( - s"Cannot start query with name $name as a query with that name is already active") + throw new IllegalArgumentException(s"Cannot start query with name $name as a query " + + s"with that name is already active in this SparkSession") } } // Make sure no other query with same id is active across all sessions - val activeOption = - Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) - if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) { - throw new IllegalStateException( - s"Cannot start query with id ${query.id} as another query with same id is " + - s"already active. Perhaps you are attempting to restart a query from checkpoint " + - s"that is already active.") + val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) + .orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ... + + val shouldStopActiveRun = + sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART) + if (activeOption.isDefined) { + if (shouldStopActiveRun) { + val oldQuery = activeOption.get + logWarning(s"Stopping existing streaming query [id=${query.id}, " + + s"runId=${oldQuery.runId}], as a new run is being started.") + Some(oldQuery) + } else { + throw new IllegalStateException( + s"Cannot start query with id ${query.id} as another query with same id is " + + s"already active. Perhaps you are attempting to restart a query from checkpoint " + + s"that is already active. You may stop the old query by setting the SQL " + + "configuration: " + + s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true) """ + + "and retry.") + } + } else { + // nothing to stop so, no-op + None } + } + // stop() will clear the queryId from activeStreamingQueries as well as activeQueries + activeRunOpt.foreach(_.stop()) + + activeQueriesSharedLock.synchronized { + // We still can have a race condition when two concurrent instances try to start the same + // stream, while a third one was already active and stopped above. In this case, we throw a + // ConcurrentModificationException. + val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put( + query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper + if (oldActiveQuery != null) { + throw new ConcurrentModificationException( + "Another instance of this query was just started by a concurrent session.") + } activeQueries.put(query.id, query) } + try { // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously. // As it's provided by the user and can run arbitrary codes, we must not hold any lock here. @@ -372,7 +407,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo query.streamingQuery.start() } catch { case e: Throwable => - unregisterTerminatedStream(query.id) + unregisterTerminatedStream(query) throw e } query @@ -380,7 +415,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo /** Notify (by the StreamingQuery) that the query has been terminated */ private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = { - unregisterTerminatedStream(terminatedQuery.id) + unregisterTerminatedStream(terminatedQuery) awaitTerminationLock.synchronized { if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { lastTerminatedQuery = terminatedQuery @@ -390,11 +425,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo stateStoreCoordinator.deactivateInstances(terminatedQuery.runId) } - private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = { - activeQueriesLock.synchronized { - // remove from shared state only if the streaming query manager also matches - sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this) - activeQueries -= terminatedQueryId + private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = { + activeQueriesSharedLock.synchronized { + // remove from shared state only if the streaming execution also matches + sparkSession.sharedState.activeStreamingQueries.remove( + terminatedQuery.id, terminatedQuery) + activeQueries -= terminatedQuery.id } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 09580b94056b..96f7efeef98e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{Dataset, Encoders} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils @@ -274,48 +275,119 @@ class StreamingQueryManagerSuite extends StreamTest { } testQuietly("can't start multiple instances of the same streaming query in the same session") { - withTempDir { dir => - val (ms1, ds1) = makeDataset - val (ms2, ds2) = makeDataset - val chkLocation = new File(dir, "_checkpoint").getCanonicalPath - val dataLocation = new File(dir, "data").getCanonicalPath - - val query1 = ds1.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms1.addData(1, 2, 3) - try { - val e = intercept[IllegalStateException] { - ds2.writeStream.format("parquet") + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") { + withTempDir { dir => + val (ms1, ds1) = makeDataset + val (ms2, ds2) = makeDataset + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ds1.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + try { + val e = intercept[IllegalStateException] { + ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + } + assert(e.getMessage.contains("same id")) + } finally { + spark.streams.active.foreach(_.stop()) + } + } + } + } + + testQuietly("new instance of the same streaming query stops old query in the same session") { + failAfter(90 seconds) { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") { + withTempDir { dir => + val (ms1, ds1) = makeDataset + val (ms2, ds2) = makeDataset + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ds1.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + val query2 = ds2.writeStream.format("parquet") .option("checkpointLocation", chkLocation).start(dataLocation) + try { + ms2.addData(1, 2, 3) + query2.processAllAvailable() + assert(spark.sharedState.activeStreamingQueries.get(query2.id) === + query2.asInstanceOf[StreamingQueryWrapper].streamingQuery, + "The correct streaming query is not being tracked in global state") + + assert(!query1.isActive, + "First query should have stopped before starting the second query") + } finally { + spark.streams.active.foreach(_.stop()) + } } - assert(e.getMessage.contains("same id")) - } finally { - query1.stop() } } } testQuietly( "can't start multiple instances of the same streaming query in the different sessions") { - withTempDir { dir => - val session2 = spark.cloneSession() - - val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) - val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() - val chkLocation = new File(dir, "_checkpoint").getCanonicalPath - val dataLocation = new File(dir, "data").getCanonicalPath + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") { + withTempDir { dir => + val session2 = spark.cloneSession() + + val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ms1.toDS().writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + try { + val e = intercept[IllegalStateException] { + ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + } + assert(e.getMessage.contains("same id")) + } finally { + spark.streams.active.foreach(_.stop()) + session2.streams.active.foreach(_.stop()) + } + } + } + } - val query1 = ms1.toDS().writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms1.addData(1, 2, 3) - try { - val e = intercept[IllegalStateException] { - ds2.writeStream.format("parquet") + testQuietly( + "new instance of the same streaming query stops old query in a different session") { + failAfter(90 seconds) { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") { + withTempDir { dir => + val session2 = spark.cloneSession() + + val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ms1.toDS().writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + val query2 = ds2.writeStream.format("parquet") .option("checkpointLocation", chkLocation).start(dataLocation) + try { + ms1.addData(1, 2, 3) + query2.processAllAvailable() + assert(spark.sharedState.activeStreamingQueries.get(query2.id) === + query2.asInstanceOf[StreamingQueryWrapper].streamingQuery, + "The correct streaming execution is not being tracked in global state") + + assert(!query1.isActive, + "First query should have stopped before starting the second query") + } finally { + spark.streams.active.foreach(_.stop()) + session2.streams.active.foreach(_.stop()) + } } - assert(e.getMessage.contains("same id")) - } finally { - query1.stop() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 760731d26f05..4121f499bd69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -123,9 +123,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(q3.runId !== q4.runId) // Only one query with same id can be active - val q5 = startQuery(restart = false) - val e = intercept[IllegalStateException] { - startQuery(restart = true) + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") { + val q5 = startQuery(restart = false) + val e = intercept[IllegalStateException] { + startQuery(restart = true) + } } } }