From f91f365a34db55834f2f5f2eec65ece0eacfd29c Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Mon, 10 Sep 2018 14:40:46 -0700 Subject: [PATCH 1/6] Set a task context property to indicate we are using continuous processing --- .../execution/streaming/continuous/ContinuousExecution.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 4ddebb33b79d..01bd5cf6271f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -209,6 +209,8 @@ class ContinuousExecution( scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig }.head + sparkSessionForQuery.sparkContext.setLocalProperty( + ContinuousExecution.IS_CONTINUOUS_PROCESSING, true.toString) sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString) // Add another random ID on top of the run ID, to distinguish epoch coordinators across @@ -391,6 +393,7 @@ class ContinuousExecution( } object ContinuousExecution { + val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" val START_EPOCH_KEY = "__continuous_start_epoch" val EPOCH_COORDINATOR_ID_KEY = "__epoch_coordinator_id" val EPOCH_INTERVAL_KEY = "__continuous_epoch_interval" From 2b86b2f79434585c9e2974f37c1d97b5830f0ef5 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Mon, 10 Sep 2018 14:58:17 -0700 Subject: [PATCH 2/6] Use property to decide which version instaed of reading ThreadLocal variable --- .../execution/streaming/state/StateStoreRDD.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 3f11b8f79943..474ed5717ba8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.streaming.continuous.EpochTracker +import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochTracker} import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( // If we're in continuous processing mode, we should get the store version for the current // epoch rather than the one at planning time. - val currentVersion = EpochTracker.getCurrentEpoch match { - case None => storeVersion - case Some(value) => value + val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) + .map(_.toBoolean) + val currentVersion = if (isContinuous.contains(true)) { + val epoch = EpochTracker.getCurrentEpoch + assert(epoch.isDefined, "Current epoch must be defined for continuous processing streams.") + epoch.get + } else { + storeVersion } store = StateStore.get( From 0cf07515eebf609a0aec0c0b7ede934cc4465e21 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Mon, 10 Sep 2018 16:51:18 -0700 Subject: [PATCH 3/6] Set is continuous processing property to false for microbatch --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index b1cafd67820c..60cb1744c6e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} @@ -511,6 +512,8 @@ class MicroBatchExecution( sparkSessionToRunBatch.sparkContext.setLocalProperty( MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString) + sparkSessionToRunBatch.sparkContext.setLocalProperty( + ContinuousExecution.IS_CONTINUOUS_PROCESSING, false.toString) reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( From c2f813bb46bd08ee808ef35ad9569fb9dc7194a6 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Mon, 10 Sep 2018 16:51:32 -0700 Subject: [PATCH 4/6] tests --- .../spark/sql/streaming/StreamSuite.scala | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index bf509b1976ed..c8f2195ad64d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -29,13 +29,14 @@ import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, TaskContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.functions._ @@ -788,7 +789,7 @@ class StreamSuite extends StreamTest { val query = input .toDS() .map { i => - while (!org.apache.spark.TaskContext.get().isInterrupted()) { + while (!TaskContext.get().isInterrupted()) { // keep looping till interrupted by query.stop() Thread.sleep(100) } @@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest { false)) } + test("is_continuous_processing property should be false for microbatch processing") { + val input = MemoryStream[Int] + val df = input.toDS() + .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) + testStream(df) ( + AddData(input, 1), + CheckAnswer("false") + ) + } + + test("is_continuous_processing property should be true for continuous processing") { + val input = ContinuousMemoryStream[Int] + var x: String = "" + val stream = input.toDS() + .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) + .writeStream.format("memory") + .queryName("output") + .trigger(Trigger.Continuous("1 seconds")) + .start() + try { + input.addData(1) + stream.processAllAvailable() + } finally { + stream.stop() + } + + checkAnswer(spark.sql("select * from output"), Row("true")) + } + for (e <- Seq( new InterruptedException, new InterruptedIOException, From 3ebbed3b5ed09638cf5f2b4e31ff28ede7bf9e73 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Tue, 11 Sep 2018 10:32:50 -0700 Subject: [PATCH 5/6] address PR comments --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 3 +-- .../spark/sql/execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/continuous/ContinuousExecution.scala | 3 +-- .../spark/sql/execution/streaming/state/StateStoreRDD.scala | 5 +++-- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 5 ++--- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 60cb1744c6e7..2cac86599ef1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} @@ -513,7 +512,7 @@ class MicroBatchExecution( sparkSessionToRunBatch.sparkContext.setLocalProperty( MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString) sparkSessionToRunBatch.sparkContext.setLocalProperty( - ContinuousExecution.IS_CONTINUOUS_PROCESSING, false.toString) + StreamExecution.IS_CONTINUOUS_PROCESSING, false.toString) reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a39bb715c991..f6c60c1c9212 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -529,6 +529,7 @@ abstract class StreamExecution( object StreamExecution { val QUERY_ID_KEY = "sql.streaming.queryId" + val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" def isInterruptionException(e: Throwable): Boolean = e match { // InterruptedIOException - thrown when an I/O operation is interrupted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 01bd5cf6271f..ccca72667a21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -210,7 +210,7 @@ class ContinuousExecution( }.head sparkSessionForQuery.sparkContext.setLocalProperty( - ContinuousExecution.IS_CONTINUOUS_PROCESSING, true.toString) + StreamExecution.IS_CONTINUOUS_PROCESSING, true.toString) sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString) // Add another random ID on top of the run ID, to distinguish epoch coordinators across @@ -393,7 +393,6 @@ class ContinuousExecution( } object ContinuousExecution { - val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing" val START_EPOCH_KEY = "__continuous_start_epoch" val EPOCH_COORDINATOR_ID_KEY = "__epoch_coordinator_id" val EPOCH_INTERVAL_KEY = "__continuous_epoch_interval" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 474ed5717ba8..a194ee956248 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -23,7 +23,8 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochTracker} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.EpochTracker import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -74,7 +75,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( // If we're in continuous processing mode, we should get the store version for the current // epoch rather than the one at planning time. - val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) + val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) .map(_.toBoolean) val currentVersion = if (isContinuous.contains(true)) { val epoch = EpochTracker.getCurrentEpoch diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c8f2195ad64d..f55ddb5419d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1033,7 +1033,7 @@ class StreamSuite extends StreamTest { test("is_continuous_processing property should be false for microbatch processing") { val input = MemoryStream[Int] val df = input.toDS() - .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) + .map(i => TaskContext.get().getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) testStream(df) ( AddData(input, 1), CheckAnswer("false") @@ -1042,9 +1042,8 @@ class StreamSuite extends StreamTest { test("is_continuous_processing property should be true for continuous processing") { val input = ContinuousMemoryStream[Int] - var x: String = "" val stream = input.toDS() - .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING)) + .map(i => TaskContext.get().getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) .writeStream.format("memory") .queryName("output") .trigger(Trigger.Continuous("1 seconds")) From 4d4beef95e18d743e4b8f8bec71b7a6229cc58d3 Mon Sep 17 00:00:00 2001 From: Mukul Murthy Date: Tue, 11 Sep 2018 11:55:54 -0700 Subject: [PATCH 6/6] TD's comment --- .../spark/sql/execution/streaming/state/StateStoreRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index a194ee956248..4a69a48fed75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -76,8 +76,8 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( // If we're in continuous processing mode, we should get the store version for the current // epoch rather than the one at planning time. val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) - .map(_.toBoolean) - val currentVersion = if (isContinuous.contains(true)) { + .map(_.toBoolean).getOrElse(false) + val currentVersion = if (isContinuous) { val epoch = EpochTracker.getCurrentEpoch assert(epoch.isDefined, "Current epoch must be defined for continuous processing streams.") epoch.get