From 29d184c1e39c8c9203ff1b5988e7f429680fce2d Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 Dec 2017 10:54:14 -0600 Subject: [PATCH 1/2] [SPARK-22861][SQL] SQLAppStatusListener handles multi-job executions. When one execution has multiple jobs, we need to append to the set of stages, not replace them on every job. --- .../execution/ui/SQLAppStatusListener.scala | 3 +- .../sql/execution/ui/SQLListenerSuite.scala | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index cf0000c6393a..cf4c2a2fb996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI -import org.apache.spark.util.kvstore.KVStore private[sql] class SQLAppStatusListener( conf: SparkConf, @@ -88,7 +87,7 @@ private[sql] class SQLAppStatusListener( } exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) - exec.stages = event.stageIds.toSet + exec.stages ++= event.stageIds.toSet update(exec) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 932950687942..8c16d46bffaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -361,6 +361,46 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest assertJobs(store.execution(0), failed = Seq(0)) } + sqlStoreTest("handle one execution with multiple jobs") { (store, bus) => + val executionId = 0 + val df = createTestDataFrame + bus.postToAll(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + + var stageId = 0 + def twoStageJob(jobId: Int): Unit = { + val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)} + stageId += 2 + bus.postToAll(SparkListenerJobStart( + jobId = jobId, + time = System.currentTimeMillis(), + stageInfos = stages, + createProperties(executionId))) + stages.foreach { s => + bus.postToAll(SparkListenerStageSubmitted(s)) + bus.postToAll(SparkListenerStageCompleted(s)) + } + bus.postToAll(SparkListenerJobEnd( + jobId = jobId, + time = System.currentTimeMillis(), + JobSucceeded + )) + } + // submit two jobs with the same executionId + twoStageJob(0) + twoStageJob(1) + bus.postToAll(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + + assertJobs(store.execution(0), completed = 0 to 1) + assert(store.execution(0).get.stages === (0 to 3).toSet) + } + test("SPARK-11126: no memory leak when running non SQL jobs") { val previousStageNumber = statusStore.executionsList().size spark.sparkContext.parallelize(1 to 10).foreach(i => ()) From 89e1c6d4cb75dfdadf3de768754fc92c985f4d73 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 Dec 2017 13:26:10 -0600 Subject: [PATCH 2/2] fix merge issues --- .../ui/SQLAppStatusListenerSuite.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 78809354efd5..7d84f45d36be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -383,10 +383,13 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with assertJobs(statusStore.execution(executionId), failed = Seq(0)) } - sqlStoreTest("handle one execution with multiple jobs") { (store, bus) => + test("handle one execution with multiple jobs") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + val executionId = 0 val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( + listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", @@ -398,16 +401,16 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with def twoStageJob(jobId: Int): Unit = { val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)} stageId += 2 - bus.postToAll(SparkListenerJobStart( + listener.onJobStart(SparkListenerJobStart( jobId = jobId, time = System.currentTimeMillis(), stageInfos = stages, createProperties(executionId))) stages.foreach { s => - bus.postToAll(SparkListenerStageSubmitted(s)) - bus.postToAll(SparkListenerStageCompleted(s)) + listener.onStageSubmitted(SparkListenerStageSubmitted(s)) + listener.onStageCompleted(SparkListenerStageCompleted(s)) } - bus.postToAll(SparkListenerJobEnd( + listener.onJobEnd(SparkListenerJobEnd( jobId = jobId, time = System.currentTimeMillis(), JobSucceeded @@ -416,11 +419,11 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with // submit two jobs with the same executionId twoStageJob(0) twoStageJob(1) - bus.postToAll(SparkListenerSQLExecutionEnd( + listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - assertJobs(store.execution(0), completed = 0 to 1) - assert(store.execution(0).get.stages === (0 to 3).toSet) + assertJobs(statusStore.execution(0), completed = 0 to 1) + assert(statusStore.execution(0).get.stages === (0 to 3).toSet) } test("SPARK-11126: no memory leak when running non SQL jobs") {