Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.internal

import java.net.URL
import java.util.Locale
import java.util.{Locale, UUID}
import java.util.concurrent.ConcurrentHashMap

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -110,6 +112,12 @@ private[sql] class SharedState(
*/
val cacheManager: CacheManager = new CacheManager

/**
* A map of active streaming queries to the session specific StreamingQueryManager that manages
* the lifecycle of that stream.
*/
private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]()

/**
* A status store to query SQL status/metrics of this Spark application, based on SQL-specific
* [[org.apache.spark.scheduler.SparkListenerEvent]]s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
}

// Make sure no other query with same id is active
if (activeQueries.values.exists(_.id == query.id)) {
// Make sure no other query with same id is active across all sessions
val activeOption =
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this))
if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: are you seeing the actual case where activeOption is None but activeQueries contain such query? I'm not seeing the case, though I don't think adding this would hurt.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that one ) is missed after query.id). Style check and compilation failure.

[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala: Expected token RPAREN but got Token(DOT,.,14050,.)
[error] Total time: 20 s, completed Oct 12, 2019 4:11:49 PM

throw new IllegalStateException(
s"Cannot start query with id ${query.id} as another query with same id is " +
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
Expand All @@ -370,19 +372,15 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
query.streamingQuery.start()
} catch {
case e: Throwable =>
activeQueriesLock.synchronized {
activeQueries -= query.id
}
unregisterTerminatedStream(query.id)
throw e
}
query
}

/** Notify (by the StreamingQuery) that the query has been terminated */
private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = {
activeQueriesLock.synchronized {
activeQueries -= terminatedQuery.id
}
unregisterTerminatedStream(terminatedQuery.id)
awaitTerminationLock.synchronized {
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
lastTerminatedQuery = terminatedQuery
Expand All @@ -391,4 +389,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
stateStoreCoordinator.deactivateInstances(terminatedQuery.runId)
}

private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = {
activeQueriesLock.synchronized {
// remove from shared state only if the streaming query manager also matches
sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this)
activeQueries -= terminatedQueryId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.streaming

import java.io.File
import java.util.concurrent.CountDownLatch

import scala.concurrent.Future
Expand All @@ -28,7 +29,7 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.util.BlockingSource
Expand Down Expand Up @@ -242,6 +243,83 @@ class StreamingQueryManagerSuite extends StreamTest {
}
}

testQuietly("can't start a streaming query with the same name in the same session") {
val ds1 = makeDataset._2
val ds2 = makeDataset._2
val queryName = "abc"

val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
try {
val e = intercept[IllegalArgumentException] {
ds2.writeStream.format("noop").queryName(queryName).start()
}
assert(e.getMessage.contains("query with that name is already active"))
} finally {
query1.stop()
}
}

testQuietly("can start a streaming query with the same name in a different session") {
val session2 = spark.cloneSession()

val ds1 = MemoryStream(Encoders.INT, spark.sqlContext).toDS()
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
val queryName = "abc"

val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
val query2 = ds2.writeStream.format("noop").queryName(queryName).start()

query1.stop()
query2.stop()
}

testQuietly("can't start multiple instances of the same streaming query in the same session") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case names seems to be switched, @brkyvz .
in the same session -> in the different sessions?

withTempDir { dir =>
val session2 = spark.cloneSession()

val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ms1.toDS().writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
try {
val e = intercept[IllegalStateException] {
ds2.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
}
assert(e.getMessage.contains("same id"))
} finally {
query1.stop()
}
}
}

testQuietly(
Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a sake of understanding, this patch is intended to prevent starting multiple instances of the same streaming query in the different sessions (while it was allowed to do this and it would probably incur some problem), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. This is the specific test which would have previously failed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this test case doesn't fail without this patch because this is for single session test case. The above test case (line 276) fails without this patch correctly.

"can't start multiple instances of the same streaming query in the different sessions") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the test body, in the different sessions -> in the same session?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I named them wrong :)

withTempDir { dir =>
val (ms1, ds1) = makeDataset
val (ms2, ds2) = makeDataset
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ds1.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
try {
val e = intercept[IllegalStateException] {
ds2.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
}
assert(e.getMessage.contains("same id"))
} finally {
query1.stop()
}
}
}

/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
failAfter(streamingTimeout) {
Expand Down