From 0e5e52f2c6b934a372d098a0d7780da18d3f99e0 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 22 Feb 2018 10:39:13 -0800 Subject: [PATCH 1/4] remove job group cancel from continuous processing --- .../execution/streaming/continuous/ContinuousExecution.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 2c1d6c509d21..8a28f095d9b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -238,7 +238,6 @@ class ContinuousExecution( if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) { stopSources() if (queryExecutionThread.isAlive) { - sparkSession.sparkContext.cancelJobGroup(runId.toString) queryExecutionThread.interrupt() } false @@ -266,6 +265,10 @@ class ContinuousExecution( SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution)(lastExecution.toRdd) } + } catch { + case t: Throwable + if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => + // interrupted by reconfiguration - swallow exception so we can restart the query } finally { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) SparkEnv.get.rpcEnv.stop(epochEndpoint) From ae2d853fceb25c09efd772d3bb8802982bc86331 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 22 Feb 2018 12:59:24 -0800 Subject: [PATCH 2/4] add stop/cancel --- .../execution/streaming/continuous/ContinuousExecution.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 8a28f095d9b4..b7c341ac9c34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -236,7 +236,6 @@ class ContinuousExecution( startTrigger() if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) { - stopSources() if (queryExecutionThread.isAlive) { queryExecutionThread.interrupt() } @@ -268,6 +267,8 @@ class ContinuousExecution( } catch { case t: Throwable if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => + stopSources() + sparkSession.sparkContext.cancelJobGroup(runId.toString) // interrupted by reconfiguration - swallow exception so we can restart the query } finally { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) From d404bafa869a4950d607200586c88ea17d99f9f5 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 23 Feb 2018 15:27:34 -0800 Subject: [PATCH 3/4] move stops to finally --- .../execution/streaming/continuous/ContinuousExecution.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index b7c341ac9c34..aa06742d8720 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -267,8 +267,6 @@ class ContinuousExecution( } catch { case t: Throwable if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => - stopSources() - sparkSession.sparkContext.cancelJobGroup(runId.toString) // interrupted by reconfiguration - swallow exception so we can restart the query } finally { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) @@ -276,6 +274,9 @@ class ContinuousExecution( epochUpdateThread.interrupt() epochUpdateThread.join() + + stopSources() + sparkSession.sparkContext.cancelJobGroup(runId.toString) } } From d3b16c11671ba6514360121556ed5554f8bcf890 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 23 Feb 2018 15:37:54 -0800 Subject: [PATCH 4/4] add log --- .../sql/execution/streaming/continuous/ContinuousExecution.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index aa06742d8720..daebd1dd010a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -267,6 +267,7 @@ class ContinuousExecution( } catch { case t: Throwable if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => + logInfo(s"Query $id ignoring exception from reconfiguring: $t") // interrupted by reconfiguration - swallow exception so we can restart the query } finally { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)