-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29568][SS] Stop existing running streams when a new stream is launched #26225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
1d4167f
e30ec9a
7b6b17c
ff14e95
30892ba
d8d4e8f
dd20574
d999fb7
2216fe2
9fbf56a
3cea936
bff9162
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1087,6 +1087,14 @@ object SQLConf { | |
| .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") | ||
| .createWithDefault(2) | ||
|
|
||
| val STOP_RUNNING_DUPLICATE_STREAM = buildConf("spark.sql.streaming.stopExistingDuplicateStream") | ||
| .doc("Running two streams using the same checkpoint location concurrently is not supported. " + | ||
| "In the case where multiple streams are started on different SparkSessions, access to the " + | ||
| "older stream's SparkSession may not be possible, and the stream may have turned into a " + | ||
| "zombie stream. When this flag is true, we will stop the old stream to start the new one.") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we have
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question. Here's my argument why we should change it:
I would argue that 3 is more common than 2, and including 1, this is where we can change behavior and mention in release notes.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question. Here's my argument why we should change it:
I would argue that 3 is more common than 2, and including 1, this is where we can change behavior and mention in release notes.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for the release notes.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think the docs can be better. here are confusing parts.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 one the name now. I like it. |
||
|
|
||
| val UNSUPPORTED_OPERATION_CHECK_ENABLED = | ||
| buildConf("spark.sql.streaming.unsupportedOperationCheck") | ||
| .internal() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -355,11 +355,22 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo | |
| // Make sure no other query with same id is active across all sessions | ||
| val activeOption = | ||
| Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) | ||
| if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) { | ||
|
|
||
| val streamAlreadyActive = | ||
| activeOption.isDefined || activeQueries.values.exists(_.id == query.id) | ||
| val turnOffOldStream = | ||
|
||
| sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM) | ||
| if (streamAlreadyActive && turnOffOldStream) { | ||
| val queryManager = activeOption.getOrElse(this) | ||
| logInfo(s"Stopping existing streaming query [id=${query.id}], as a new run is being " + | ||
|
||
| "started.") | ||
| queryManager.get(query.id).stop() | ||
|
||
| } else if (streamAlreadyActive) { | ||
| throw new IllegalStateException( | ||
| s"Cannot start query with id ${query.id} as another query with same id is " + | ||
| s"already active. Perhaps you are attempting to restart a query from checkpoint " + | ||
| s"that is already active.") | ||
| "Cannot start query with id ${query.id} as another query with same id is " + | ||
| "already active. Perhaps you are attempting to restart a query from checkpoint " + | ||
| "that is already active. You may stop the old query by setting the SQL " + | ||
| s"""configuration: spark.conf.set("${SQLConf.STOP_RUNNING_DUPLICATE_STREAM}", true).""") | ||
|
||
| } | ||
|
|
||
| activeQueries.put(query.id, query) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException | |
| import org.apache.spark.sql.{Dataset, Encoders} | ||
| import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation | ||
| import org.apache.spark.sql.execution.streaming._ | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.streaming.util.BlockingSource | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -274,48 +275,102 @@ class StreamingQueryManagerSuite extends StreamTest { | |
| } | ||
|
|
||
| testQuietly("can't start multiple instances of the same streaming query in the same session") { | ||
| withTempDir { dir => | ||
| val (ms1, ds1) = makeDataset | ||
| val (ms2, ds2) = makeDataset | ||
| val chkLocation = new File(dir, "_checkpoint").getCanonicalPath | ||
| val dataLocation = new File(dir, "data").getCanonicalPath | ||
|
|
||
| val query1 = ds1.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms1.addData(1, 2, 3) | ||
| try { | ||
| val e = intercept[IllegalStateException] { | ||
| ds2.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { | ||
| withTempDir { dir => | ||
| val (ms1, ds1) = makeDataset | ||
| val (ms2, ds2) = makeDataset | ||
| val chkLocation = new File(dir, "_checkpoint").getCanonicalPath | ||
| val dataLocation = new File(dir, "data").getCanonicalPath | ||
|
|
||
| val query1 = ds1.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms1.addData(1, 2, 3) | ||
| try { | ||
| val e = intercept[IllegalStateException] { | ||
| ds2.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| } | ||
| assert(e.getMessage.contains("same id")) | ||
| } finally { | ||
| query1.stop() | ||
| } | ||
| assert(e.getMessage.contains("same id")) | ||
| } finally { | ||
| query1.stop() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| testQuietly("new instance of the same streaming query stops old query in the same session") { | ||
| withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { | ||
| withTempDir { dir => | ||
| val (ms1, ds1) = makeDataset | ||
| val (ms2, ds2) = makeDataset | ||
| val chkLocation = new File(dir, "_checkpoint").getCanonicalPath | ||
| val dataLocation = new File(dir, "data").getCanonicalPath | ||
|
|
||
| val query1 = ds1.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms1.addData(1, 2, 3) | ||
| val query2 = ds2.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms2.addData(1, 2, 3) | ||
| query2.processAllAvailable() | ||
|
|
||
| assert(!query1.isActive, "First query should have stopped before starting the second query") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| testQuietly( | ||
| "can't start multiple instances of the same streaming query in the different sessions") { | ||
| withTempDir { dir => | ||
| val session2 = spark.cloneSession() | ||
|
|
||
| val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) | ||
| val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() | ||
| val chkLocation = new File(dir, "_checkpoint").getCanonicalPath | ||
| val dataLocation = new File(dir, "data").getCanonicalPath | ||
| withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { | ||
| withTempDir { dir => | ||
| val session2 = spark.cloneSession() | ||
|
|
||
| val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) | ||
| val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() | ||
| val chkLocation = new File(dir, "_checkpoint").getCanonicalPath | ||
| val dataLocation = new File(dir, "data").getCanonicalPath | ||
|
|
||
| val query1 = ms1.toDS().writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms1.addData(1, 2, 3) | ||
| try { | ||
| val e = intercept[IllegalStateException] { | ||
| ds2.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| } | ||
| assert(e.getMessage.contains("same id")) | ||
| } finally { | ||
| query1.stop() | ||
|
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| val query1 = ms1.toDS().writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms1.addData(1, 2, 3) | ||
| try { | ||
| val e = intercept[IllegalStateException] { | ||
| ds2.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| testQuietly( | ||
| "new instance of the same streaming query stops old query in a different session") { | ||
| withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { | ||
| withTempDir { dir => | ||
| val session2 = spark.cloneSession() | ||
|
|
||
| val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) | ||
| val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() | ||
| val chkLocation = new File(dir, "_checkpoint").getCanonicalPath | ||
| val dataLocation = new File(dir, "data").getCanonicalPath | ||
|
|
||
| val query1 = ms1.toDS().writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| ms1.addData(1, 2, 3) | ||
| val query2 = ds2.writeStream.format("parquet") | ||
| .option("checkpointLocation", chkLocation).start(dataLocation) | ||
| try { | ||
| ms1.addData(1, 2, 3) | ||
| query2.processAllAvailable() | ||
|
|
||
| assert(!query1.isActive, | ||
| "First query should have stopped before starting the second query") | ||
| } finally { | ||
| query2.stop() | ||
| } | ||
| assert(e.getMessage.contains("same id")) | ||
| } finally { | ||
| query1.stop() | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopExistingDuplicateStream->stopExistingDuplicatedStream?