From 56b91a321c30d0908f255f4acb22854751c4dfe7 Mon Sep 17 00:00:00 2001 From: yuanfuyuan <1406957364@qq.com> Date: Tue, 10 Oct 2023 22:48:13 +0800 Subject: [PATCH 01/35] fix_4186 --- .../spark/kyuubi/SQLOperationListener.scala | 67 ++++++++++++------- .../kyuubi/SparkConsoleProgressBar.scala | 18 ++++- .../org/apache/spark/kyuubi/StageStatus.scala | 5 ++ 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 4e4a940d295..c362af72870 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -44,7 +44,7 @@ class SQLOperationListener( spark: SparkSession) extends StatsReportListener with Logging { private val operationId: String = operation.getHandle.identifier.toString - private lazy val activeJobs = new java.util.HashSet[Int]() + private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]() private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None @@ -53,6 +53,7 @@ class SQLOperationListener( if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { Some(new SparkConsoleProgressBar( operation, + activeJobs, activeStages, conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) @@ -79,37 +80,45 @@ class SQLOperationListener( } } - override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized { - if (sameGroupId(jobStart.properties)) { - val jobId = jobStart.jobId - val stageSize = jobStart.stageInfos.size - if (executionId.isEmpty) { - executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) - .map(_.toLong) - consoleProgressBar - operation match { - case executeStatement: ExecuteStatement => - executeStatement.setCompiledStateIfNeeded() - case _ => + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + activeJobs.synchronized { + if (sameGroupId(jobStart.properties)) { + val jobId = jobStart.jobId + val stageIds = jobStart.stageInfos.map(_.stageId) + val stageSize = jobStart.stageInfos.size + if (executionId.isEmpty) { + executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) + .map(_.toLong) + consoleProgressBar + operation match { + case executeStatement: ExecuteStatement => + executeStatement.setCompiledStateIfNeeded() + case _ => + } + } + activeJobs.put( + jobId, + new SparkJobInfo(stageSize, stageIds) + ) + withOperationLog { + info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + + s" ${activeJobs.size()} active jobs running") } - } - withOperationLog { - activeJobs.add(jobId) - info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + - s" ${activeJobs.size()} active jobs running") } } } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { val jobId = jobEnd.jobId - if (activeJobs.remove(jobId)) { - val hint = jobEnd.jobResult match { - case JobSucceeded => "succeeded" - case _ => "failed" // TODO: Handle JobFailed(exception: Exception) - } - withOperationLog { - info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") + activeJobs.synchronized { + if (activeJobs.remove(jobId) != null) { + val hint = jobEnd.jobResult match { + case JobSucceeded => "succeeded" + case _ => "failed" // TODO: Handle JobFailed(exception: Exception) + } + withOperationLog { + info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") + } } } } @@ -134,9 +143,17 @@ class SQLOperationListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageInfo = stageCompleted.stageInfo + val stageId = stageInfo.stageId val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { + activeJobs.synchronized { + activeJobs.forEach((jobId, sparkJobInfo) => { + if (sparkJobInfo.stageIds.contains(stageId)) { + sparkJobInfo.numCompleteStages.getAndIncrement() + } + }) + } withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index dc8b493cc04..27b41c2b8b4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation class SparkConsoleProgressBar( operation: Operation, + liveJobs: ConcurrentHashMap[Int, SparkJobInfo], liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], updatePeriodMSec: Long, timeFormat: String) @@ -71,7 +72,14 @@ class SparkConsoleProgressBar( show(now, stages.take(3)) // display at most 3 stages in same time } } - + private def findJobId(stageId: Int): Int = { + liveJobs.forEach((jobId, sparkJobInfo) => { + if (sparkJobInfo.stageIds.contains(stageId)) { + return jobId + } + }) + -1 + } /** * Show progress bar in console. The progress bar is displayed in the next line * after your last output, keeps overwriting itself to hold in one line. The logging will follow @@ -81,7 +89,13 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val header = s"[Stage ${s.stageId}:" + val jobId = findJobId(s.stageId) + var jobHeader = s"[There is no job about this stage]" + if (jobId != -1) { + jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + + s"/ ${liveJobs.get(jobId).numStages}) Stages] " + } + val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" val w = width - header.length - tailer.length val bar = diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 2ea9c3fdae6..cd745ea1921 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -27,3 +27,8 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { var numActiveTasks = new AtomicInteger(0) var numCompleteTasks = new AtomicInteger(0) } + + +class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { + var numCompleteStages = new AtomicInteger(0) +} \ No newline at end of file From 6209c344ebaae22d8aceaab973e7e5b744f53b85 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 10 Oct 2023 23:53:58 +0800 Subject: [PATCH 02/35] [Improvement] spark showProgress can briefly output info of the job #4186 --- .../org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 27b41c2b8b4..a7d5afda8cb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -72,6 +72,12 @@ class SparkConsoleProgressBar( show(now, stages.take(3)) // display at most 3 stages in same time } } + + /** + * Use stageId to find stage's jobId + * @param stageId stageId + * @return jobId,if not exists, return -1 + */ private def findJobId(stageId: Int): Int = { liveJobs.forEach((jobId, sparkJobInfo) => { if (sparkJobInfo.stageIds.contains(stageId)) { From d30492e46dd332ba74588d4fc077f0eb2882b389 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Thu, 12 Oct 2023 18:35:53 +0800 Subject: [PATCH 03/35] add showProgress with jobInfo Unit Test --- .../kyuubi/SQLOperationListenerSuite.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index 04277fca4a2..43ad7170565 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -28,10 +28,34 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { - override def withKyuubiConf: Map[String, String] = Map.empty + override def withKyuubiConf: Map[String, String] = Map( + "kyuubi.session.engine.spark.showProgress" -> "true", + "kyuubi.session.engine.spark.progress.update.interval" -> "200") + override protected def jdbcUrl: String = getJdbcUrl + + + test("operation listener with progress job info") { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 5000l) FROM range(1, 3, 1, 2);" + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val opHandle = tExecuteStatementResp.getOperationHandle + val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000) + fetchResultsReq.setFetchType(1.toShort) + eventually(timeout(90.seconds), interval(500.milliseconds)) { + val resultsResp = client.FetchResults(fetchResultsReq) + val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala + print(logs) + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + } + } + } + test("operation listener") { val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" withSessionHandle { (client, handle) => From 32ad0759b67a933fd801a3eed8e4bf627a80d749 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Thu, 12 Oct 2023 18:45:34 +0800 Subject: [PATCH 04/35] add showProgress with jobInfo Unit Test --- .../org/apache/spark/kyuubi/SQLOperationListenerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index 43ad7170565..d9938236f2b 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -38,7 +38,7 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 5000l) FROM range(1, 3, 1, 2);" + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -50,7 +50,6 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - print(logs) assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) } } From 4564ef98f177c753b904bed5858ab036c1a4ed8d Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 00:08:26 +0800 Subject: [PATCH 05/35] improvement_add_job_log fix 1. remove duplicate synchronized 2. because the activeJobs is ConcurrentHashMap, so reduce synchronized 3. fix scala code style 4. change forEach to asScala code style 5. change conf str to KyuubiConf.XXX.key --- .../spark/kyuubi/SQLOperationListener.scala | 73 +++++++++---------- .../kyuubi/SparkConsoleProgressBar.scala | 15 ++-- .../org/apache/spark/kyuubi/StageStatus.scala | 1 - .../kyuubi/SQLOperationListenerSuite.scala | 8 +- 4 files changed, 46 insertions(+), 51 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index c362af72870..f22449d8fa4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -20,6 +20,8 @@ package org.apache.spark.kyuubi import java.util.Properties import java.util.concurrent.ConcurrentHashMap +import scala.jdk.CollectionConverters._ + import org.apache.spark.scheduler._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd @@ -81,44 +83,39 @@ class SQLOperationListener( } override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - activeJobs.synchronized { - if (sameGroupId(jobStart.properties)) { - val jobId = jobStart.jobId - val stageIds = jobStart.stageInfos.map(_.stageId) - val stageSize = jobStart.stageInfos.size - if (executionId.isEmpty) { - executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) - .map(_.toLong) - consoleProgressBar - operation match { - case executeStatement: ExecuteStatement => - executeStatement.setCompiledStateIfNeeded() - case _ => - } - } - activeJobs.put( - jobId, - new SparkJobInfo(stageSize, stageIds) - ) - withOperationLog { - info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + - s" ${activeJobs.size()} active jobs running") + if (sameGroupId(jobStart.properties)) { + val jobId = jobStart.jobId + val stageIds = jobStart.stageInfos.map(_.stageId) + val stageSize = jobStart.stageInfos.size + if (executionId.isEmpty) { + executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) + .map(_.toLong) + consoleProgressBar + operation match { + case executeStatement: ExecuteStatement => + executeStatement.setCompiledStateIfNeeded() + case _ => } } + activeJobs.put( + jobId, + new SparkJobInfo(stageSize, stageIds)) + withOperationLog { + info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + + s" ${activeJobs.size()} active jobs running") + } } } - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { val jobId = jobEnd.jobId - activeJobs.synchronized { - if (activeJobs.remove(jobId) != null) { - val hint = jobEnd.jobResult match { - case JobSucceeded => "succeeded" - case _ => "failed" // TODO: Handle JobFailed(exception: Exception) - } - withOperationLog { - info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") - } + if (activeJobs.remove(jobId) != null) { + val hint = jobEnd.jobResult match { + case JobSucceeded => "succeeded" + case _ => "failed" // TODO: Handle JobFailed(exception: Exception) + } + withOperationLog { + info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") } } } @@ -147,13 +144,11 @@ class SQLOperationListener( val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { - activeJobs.synchronized { - activeJobs.forEach((jobId, sparkJobInfo) => { - if (sparkJobInfo.stageIds.contains(stageId)) { - sparkJobInfo.numCompleteStages.getAndIncrement() - } - }) - } + activeJobs.asScala.foreach(item => { + if (item._2.stageIds.contains(stageId)) { + item._2.numCompleteStages.getAndIncrement() + } + }) withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index a7d5afda8cb..bea209b0016 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -79,12 +79,15 @@ class SparkConsoleProgressBar( * @return jobId,if not exists, return -1 */ private def findJobId(stageId: Int): Int = { - liveJobs.forEach((jobId, sparkJobInfo) => { - if (sparkJobInfo.stageIds.contains(stageId)) { - return jobId - } - }) - -1 + val result: Option[Int] = liveJobs.asScala.find(item => { + item._2.stageIds.contains(stageId) + }).map(_._1) + result match { + case Some(value) => + value + case None => + -1 + } } /** * Show progress bar in console. The progress bar is displayed in the next line diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index cd745ea1921..ea7ed0c9c4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -28,7 +28,6 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { var numCompleteTasks = new AtomicInteger(0) } - class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { var numCompleteStages = new AtomicInteger(0) } \ No newline at end of file diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index d9938236f2b..f3bf6f311ec 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters.asScalaBufferConverter import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TOperationHandle} import org.scalatest.time.SpanSugar._ +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.OPERATION_SPARK_LISTENER_ENABLED import org.apache.kyuubi.engine.spark.WithSparkSQLEngine import org.apache.kyuubi.operation.HiveJDBCTestHelper @@ -29,14 +30,11 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { override def withKyuubiConf: Map[String, String] = Map( - "kyuubi.session.engine.spark.showProgress" -> "true", - "kyuubi.session.engine.spark.progress.update.interval" -> "200") - + KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true", + KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200") override protected def jdbcUrl: String = getJdbcUrl - - test("operation listener with progress job info") { val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => From e15fc7195407d12088aa766565828b30a3aa8420 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:29:24 +0800 Subject: [PATCH 06/35] improvement_add_job_log fix 1. fix new end line 2. provide Option[Int] with JobId --- .../spark/kyuubi/SparkConsoleProgressBar.scala | 15 +++++---------- .../org/apache/spark/kyuubi/StageStatus.scala | 2 +- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index bea209b0016..4b431da435d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -78,15 +78,9 @@ class SparkConsoleProgressBar( * @param stageId stageId * @return jobId,if not exists, return -1 */ - private def findJobId(stageId: Int): Int = { - val result: Option[Int] = liveJobs.asScala.find(item => { - item._2.stageIds.contains(stageId) - }).map(_._1) - result match { - case Some(value) => - value - case None => - -1 + private def findJobId(stageId: Int): Option[Int] = { + liveJobs.asScala.collectFirst { + case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId } } /** @@ -98,7 +92,8 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val jobId = findJobId(s.stageId) + val result = findJobId(s.stageId) + val jobId = result.getOrElse(-1) var jobHeader = s"[There is no job about this stage]" if (jobId != -1) { jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index ea7ed0c9c4d..1803094edc8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -30,4 +30,4 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { var numCompleteStages = new AtomicInteger(0) -} \ No newline at end of file +} From 249a422b6c686f09644717e4a4f1279b0eb1d9ff Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:32:40 +0800 Subject: [PATCH 07/35] improvement_add_job_log fix 1. provide Option[Int] with JobId --- .../org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 4b431da435d..a3bad236bda 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -93,11 +93,13 @@ class SparkConsoleProgressBar( val bar = stages.map { s => val total = s.numTasks val result = findJobId(s.stageId) - val jobId = result.getOrElse(-1) var jobHeader = s"[There is no job about this stage]" - if (jobId != -1) { + if (result.isDefined) { + val jobId = result.get jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + s"/ ${liveJobs.get(jobId).numStages}) Stages] " + } else { + jobHeader = s"[No job match the stage]" } val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" From 5cf8714e04e9ad49406b2021b283a2241c24b105 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:32:58 +0800 Subject: [PATCH 08/35] improvement_add_job_log fix 1. provide Option[Int] with JobId --- .../scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index a3bad236bda..c79df48dbd5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -98,8 +98,6 @@ class SparkConsoleProgressBar( val jobId = result.get jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + s"/ ${liveJobs.get(jobId).numStages}) Stages] " - } else { - jobHeader = s"[No job match the stage]" } val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" From 49debfbe3bd82838efe510ce30207cc2649e0c20 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:49:37 +0800 Subject: [PATCH 09/35] improvement_add_job_log fix 1. provide Option[Int] with JobId --- .../scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index c79df48dbd5..195f1418c13 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -83,6 +83,7 @@ class SparkConsoleProgressBar( case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId } } + /** * Show progress bar in console. The progress bar is displayed in the next line * after your last output, keeps overwriting itself to hold in one line. The logging will follow From 9fa8e73fc99bd8287e3f96cddbf185e86a60e9cb Mon Sep 17 00:00:00 2001 From: davidyuan Date: Mon, 16 Oct 2023 16:24:38 +0800 Subject: [PATCH 10/35] add showProgress with jobInfo Unit Test --- .../kyuubi/SQLOperationListenerSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index f3bf6f311ec..f732f7c3846 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -35,8 +35,8 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp override protected def jdbcUrl: String = getJdbcUrl - test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" + test("operation listener") { + val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -48,13 +48,17 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + assert(logs.exists(_.contains("started with 2 stages"))) + assert(logs.exists(_.contains("started with 1 tasks"))) + assert(logs.exists(_.contains("started with 3 tasks"))) + assert(logs.exists(_.contains("Finished stage:"))) + assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) } } } - test("operation listener") { - val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" + test("operation listener with progress job info") { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -66,11 +70,7 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.contains("started with 2 stages"))) - assert(logs.exists(_.contains("started with 1 tasks"))) - assert(logs.exists(_.contains("started with 3 tasks"))) - assert(logs.exists(_.contains("Finished stage:"))) - assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) } } } From d4bdec7982dd31e99a67634923360176c97e3d1e Mon Sep 17 00:00:00 2001 From: yuanfuyuan <1406957364@qq.com> Date: Tue, 10 Oct 2023 22:48:13 +0800 Subject: [PATCH 11/35] fix_4186 --- .../spark/kyuubi/SQLOperationListener.scala | 67 ++++++++++++------- .../kyuubi/SparkConsoleProgressBar.scala | 18 ++++- .../org/apache/spark/kyuubi/StageStatus.scala | 5 ++ 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 4e4a940d295..c362af72870 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -44,7 +44,7 @@ class SQLOperationListener( spark: SparkSession) extends StatsReportListener with Logging { private val operationId: String = operation.getHandle.identifier.toString - private lazy val activeJobs = new java.util.HashSet[Int]() + private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]() private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None @@ -53,6 +53,7 @@ class SQLOperationListener( if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { Some(new SparkConsoleProgressBar( operation, + activeJobs, activeStages, conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) @@ -79,37 +80,45 @@ class SQLOperationListener( } } - override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized { - if (sameGroupId(jobStart.properties)) { - val jobId = jobStart.jobId - val stageSize = jobStart.stageInfos.size - if (executionId.isEmpty) { - executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) - .map(_.toLong) - consoleProgressBar - operation match { - case executeStatement: ExecuteStatement => - executeStatement.setCompiledStateIfNeeded() - case _ => + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + activeJobs.synchronized { + if (sameGroupId(jobStart.properties)) { + val jobId = jobStart.jobId + val stageIds = jobStart.stageInfos.map(_.stageId) + val stageSize = jobStart.stageInfos.size + if (executionId.isEmpty) { + executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) + .map(_.toLong) + consoleProgressBar + operation match { + case executeStatement: ExecuteStatement => + executeStatement.setCompiledStateIfNeeded() + case _ => + } + } + activeJobs.put( + jobId, + new SparkJobInfo(stageSize, stageIds) + ) + withOperationLog { + info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + + s" ${activeJobs.size()} active jobs running") } - } - withOperationLog { - activeJobs.add(jobId) - info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + - s" ${activeJobs.size()} active jobs running") } } } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { val jobId = jobEnd.jobId - if (activeJobs.remove(jobId)) { - val hint = jobEnd.jobResult match { - case JobSucceeded => "succeeded" - case _ => "failed" // TODO: Handle JobFailed(exception: Exception) - } - withOperationLog { - info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") + activeJobs.synchronized { + if (activeJobs.remove(jobId) != null) { + val hint = jobEnd.jobResult match { + case JobSucceeded => "succeeded" + case _ => "failed" // TODO: Handle JobFailed(exception: Exception) + } + withOperationLog { + info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") + } } } } @@ -134,9 +143,17 @@ class SQLOperationListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageInfo = stageCompleted.stageInfo + val stageId = stageInfo.stageId val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { + activeJobs.synchronized { + activeJobs.forEach((jobId, sparkJobInfo) => { + if (sparkJobInfo.stageIds.contains(stageId)) { + sparkJobInfo.numCompleteStages.getAndIncrement() + } + }) + } withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index dc8b493cc04..27b41c2b8b4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation class SparkConsoleProgressBar( operation: Operation, + liveJobs: ConcurrentHashMap[Int, SparkJobInfo], liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], updatePeriodMSec: Long, timeFormat: String) @@ -71,7 +72,14 @@ class SparkConsoleProgressBar( show(now, stages.take(3)) // display at most 3 stages in same time } } - + private def findJobId(stageId: Int): Int = { + liveJobs.forEach((jobId, sparkJobInfo) => { + if (sparkJobInfo.stageIds.contains(stageId)) { + return jobId + } + }) + -1 + } /** * Show progress bar in console. The progress bar is displayed in the next line * after your last output, keeps overwriting itself to hold in one line. The logging will follow @@ -81,7 +89,13 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val header = s"[Stage ${s.stageId}:" + val jobId = findJobId(s.stageId) + var jobHeader = s"[There is no job about this stage]" + if (jobId != -1) { + jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + + s"/ ${liveJobs.get(jobId).numStages}) Stages] " + } + val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" val w = width - header.length - tailer.length val bar = diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 2ea9c3fdae6..cd745ea1921 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -27,3 +27,8 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { var numActiveTasks = new AtomicInteger(0) var numCompleteTasks = new AtomicInteger(0) } + + +class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { + var numCompleteStages = new AtomicInteger(0) +} \ No newline at end of file From c07535a011b113ebfd2f578a51fb218ae96f4b3a Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 10 Oct 2023 23:53:58 +0800 Subject: [PATCH 12/35] [Improvement] spark showProgress can briefly output info of the job #4186 --- .../org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 27b41c2b8b4..a7d5afda8cb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -72,6 +72,12 @@ class SparkConsoleProgressBar( show(now, stages.take(3)) // display at most 3 stages in same time } } + + /** + * Use stageId to find stage's jobId + * @param stageId stageId + * @return jobId,if not exists, return -1 + */ private def findJobId(stageId: Int): Int = { liveJobs.forEach((jobId, sparkJobInfo) => { if (sparkJobInfo.stageIds.contains(stageId)) { From af05089d49b33edfc8172ace5a17ccaef735365f Mon Sep 17 00:00:00 2001 From: davidyuan Date: Thu, 12 Oct 2023 18:35:53 +0800 Subject: [PATCH 13/35] add showProgress with jobInfo Unit Test --- .../kyuubi/SQLOperationListenerSuite.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index 04277fca4a2..43ad7170565 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -28,10 +28,34 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { - override def withKyuubiConf: Map[String, String] = Map.empty + override def withKyuubiConf: Map[String, String] = Map( + "kyuubi.session.engine.spark.showProgress" -> "true", + "kyuubi.session.engine.spark.progress.update.interval" -> "200") + override protected def jdbcUrl: String = getJdbcUrl + + + test("operation listener with progress job info") { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 5000l) FROM range(1, 3, 1, 2);" + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val opHandle = tExecuteStatementResp.getOperationHandle + val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000) + fetchResultsReq.setFetchType(1.toShort) + eventually(timeout(90.seconds), interval(500.milliseconds)) { + val resultsResp = client.FetchResults(fetchResultsReq) + val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala + print(logs) + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + } + } + } + test("operation listener") { val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" withSessionHandle { (client, handle) => From 59340b7136a7d22cf651df10a17a749e4546c914 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Thu, 12 Oct 2023 18:45:34 +0800 Subject: [PATCH 14/35] add showProgress with jobInfo Unit Test --- .../org/apache/spark/kyuubi/SQLOperationListenerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index 43ad7170565..d9938236f2b 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -38,7 +38,7 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 5000l) FROM range(1, 3, 1, 2);" + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -50,7 +50,6 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - print(logs) assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) } } From 780f9d15ea2315b5cf406c3c75e60ff67d5d43d8 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 00:08:26 +0800 Subject: [PATCH 15/35] improvement_add_job_log fix 1. remove duplicate synchronized 2. because the activeJobs is ConcurrentHashMap, so reduce synchronized 3. fix scala code style 4. change forEach to asScala code style 5. change conf str to KyuubiConf.XXX.key --- .../spark/kyuubi/SQLOperationListener.scala | 73 +++++++++---------- .../kyuubi/SparkConsoleProgressBar.scala | 15 ++-- .../org/apache/spark/kyuubi/StageStatus.scala | 1 - .../kyuubi/SQLOperationListenerSuite.scala | 8 +- 4 files changed, 46 insertions(+), 51 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index c362af72870..f22449d8fa4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -20,6 +20,8 @@ package org.apache.spark.kyuubi import java.util.Properties import java.util.concurrent.ConcurrentHashMap +import scala.jdk.CollectionConverters._ + import org.apache.spark.scheduler._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd @@ -81,44 +83,39 @@ class SQLOperationListener( } override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - activeJobs.synchronized { - if (sameGroupId(jobStart.properties)) { - val jobId = jobStart.jobId - val stageIds = jobStart.stageInfos.map(_.stageId) - val stageSize = jobStart.stageInfos.size - if (executionId.isEmpty) { - executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) - .map(_.toLong) - consoleProgressBar - operation match { - case executeStatement: ExecuteStatement => - executeStatement.setCompiledStateIfNeeded() - case _ => - } - } - activeJobs.put( - jobId, - new SparkJobInfo(stageSize, stageIds) - ) - withOperationLog { - info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + - s" ${activeJobs.size()} active jobs running") + if (sameGroupId(jobStart.properties)) { + val jobId = jobStart.jobId + val stageIds = jobStart.stageInfos.map(_.stageId) + val stageSize = jobStart.stageInfos.size + if (executionId.isEmpty) { + executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) + .map(_.toLong) + consoleProgressBar + operation match { + case executeStatement: ExecuteStatement => + executeStatement.setCompiledStateIfNeeded() + case _ => } } + activeJobs.put( + jobId, + new SparkJobInfo(stageSize, stageIds)) + withOperationLog { + info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + + s" ${activeJobs.size()} active jobs running") + } } } - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { val jobId = jobEnd.jobId - activeJobs.synchronized { - if (activeJobs.remove(jobId) != null) { - val hint = jobEnd.jobResult match { - case JobSucceeded => "succeeded" - case _ => "failed" // TODO: Handle JobFailed(exception: Exception) - } - withOperationLog { - info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") - } + if (activeJobs.remove(jobId) != null) { + val hint = jobEnd.jobResult match { + case JobSucceeded => "succeeded" + case _ => "failed" // TODO: Handle JobFailed(exception: Exception) + } + withOperationLog { + info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") } } } @@ -147,13 +144,11 @@ class SQLOperationListener( val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { - activeJobs.synchronized { - activeJobs.forEach((jobId, sparkJobInfo) => { - if (sparkJobInfo.stageIds.contains(stageId)) { - sparkJobInfo.numCompleteStages.getAndIncrement() - } - }) - } + activeJobs.asScala.foreach(item => { + if (item._2.stageIds.contains(stageId)) { + item._2.numCompleteStages.getAndIncrement() + } + }) withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index a7d5afda8cb..bea209b0016 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -79,12 +79,15 @@ class SparkConsoleProgressBar( * @return jobId,if not exists, return -1 */ private def findJobId(stageId: Int): Int = { - liveJobs.forEach((jobId, sparkJobInfo) => { - if (sparkJobInfo.stageIds.contains(stageId)) { - return jobId - } - }) - -1 + val result: Option[Int] = liveJobs.asScala.find(item => { + item._2.stageIds.contains(stageId) + }).map(_._1) + result match { + case Some(value) => + value + case None => + -1 + } } /** * Show progress bar in console. The progress bar is displayed in the next line diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index cd745ea1921..ea7ed0c9c4d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -28,7 +28,6 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { var numCompleteTasks = new AtomicInteger(0) } - class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { var numCompleteStages = new AtomicInteger(0) } \ No newline at end of file diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index d9938236f2b..f3bf6f311ec 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters.asScalaBufferConverter import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TOperationHandle} import org.scalatest.time.SpanSugar._ +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.OPERATION_SPARK_LISTENER_ENABLED import org.apache.kyuubi.engine.spark.WithSparkSQLEngine import org.apache.kyuubi.operation.HiveJDBCTestHelper @@ -29,14 +30,11 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { override def withKyuubiConf: Map[String, String] = Map( - "kyuubi.session.engine.spark.showProgress" -> "true", - "kyuubi.session.engine.spark.progress.update.interval" -> "200") - + KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true", + KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200") override protected def jdbcUrl: String = getJdbcUrl - - test("operation listener with progress job info") { val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => From 5b4aaa8b5265aedafa145cd4f8c4886ca3f2d980 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:29:24 +0800 Subject: [PATCH 16/35] improvement_add_job_log fix 1. fix new end line 2. provide Option[Int] with JobId --- .../spark/kyuubi/SparkConsoleProgressBar.scala | 15 +++++---------- .../org/apache/spark/kyuubi/StageStatus.scala | 2 +- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index bea209b0016..4b431da435d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -78,15 +78,9 @@ class SparkConsoleProgressBar( * @param stageId stageId * @return jobId,if not exists, return -1 */ - private def findJobId(stageId: Int): Int = { - val result: Option[Int] = liveJobs.asScala.find(item => { - item._2.stageIds.contains(stageId) - }).map(_._1) - result match { - case Some(value) => - value - case None => - -1 + private def findJobId(stageId: Int): Option[Int] = { + liveJobs.asScala.collectFirst { + case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId } } /** @@ -98,7 +92,8 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val jobId = findJobId(s.stageId) + val result = findJobId(s.stageId) + val jobId = result.getOrElse(-1) var jobHeader = s"[There is no job about this stage]" if (jobId != -1) { jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index ea7ed0c9c4d..1803094edc8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -30,4 +30,4 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { var numCompleteStages = new AtomicInteger(0) -} \ No newline at end of file +} From 7b9e874f205b5386d596a392d6a16c173505a4eb Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:32:40 +0800 Subject: [PATCH 17/35] improvement_add_job_log fix 1. provide Option[Int] with JobId --- .../org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 4b431da435d..a3bad236bda 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -93,11 +93,13 @@ class SparkConsoleProgressBar( val bar = stages.map { s => val total = s.numTasks val result = findJobId(s.stageId) - val jobId = result.getOrElse(-1) var jobHeader = s"[There is no job about this stage]" - if (jobId != -1) { + if (result.isDefined) { + val jobId = result.get jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + s"/ ${liveJobs.get(jobId).numStages}) Stages] " + } else { + jobHeader = s"[No job match the stage]" } val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" From e8a510891cb3fd1f41e01d7981b0662d076d5aeb Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:32:58 +0800 Subject: [PATCH 18/35] improvement_add_job_log fix 1. provide Option[Int] with JobId --- .../scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index a3bad236bda..c79df48dbd5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -98,8 +98,6 @@ class SparkConsoleProgressBar( val jobId = result.get jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + s"/ ${liveJobs.get(jobId).numStages}) Stages] " - } else { - jobHeader = s"[No job match the stage]" } val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" From a973cdde6e7240e6734182d47bfebe3ec1a38d24 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Fri, 13 Oct 2023 16:49:37 +0800 Subject: [PATCH 19/35] improvement_add_job_log fix 1. provide Option[Int] with JobId --- .../scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index c79df48dbd5..195f1418c13 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -83,6 +83,7 @@ class SparkConsoleProgressBar( case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId } } + /** * Show progress bar in console. The progress bar is displayed in the next line * after your last output, keeps overwriting itself to hold in one line. The logging will follow From 10a56b15951e0556dd70d4eb3b37a6ddee6cba55 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 17 Oct 2023 18:55:27 +0800 Subject: [PATCH 20/35] add showProgress with jobInfo Unit Test --- .../spark/kyuubi/SQLOperationListener.scala | 15 +++++++++------ .../spark/kyuubi/SparkConsoleProgressBar.scala | 11 ++++------- .../org/apache/spark/kyuubi/StageStatus.scala | 8 ++++---- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index f22449d8fa4..6adc3fdb4ba 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -85,7 +85,7 @@ class SQLOperationListener( override def onJobStart(jobStart: SparkListenerJobStart): Unit = { if (sameGroupId(jobStart.properties)) { val jobId = jobStart.jobId - val stageIds = jobStart.stageInfos.map(_.stageId) + val stageIds = jobStart.stageInfos.map(_.stageId).toSet val stageSize = jobStart.stageInfos.size if (executionId.isEmpty) { executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) @@ -144,11 +144,14 @@ class SQLOperationListener( val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { - activeJobs.asScala.foreach(item => { - if (item._2.stageIds.contains(stageId)) { - item._2.numCompleteStages.getAndIncrement() - } - }) + stageInfo.getStatusString match { + case "succeeded" => + activeJobs.asScala.foreach(item => { + if (item._2.stageIds.contains(stageId)) { + item._2.numCompleteStages.getAndIncrement() + } + }) + } withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 195f1418c13..023641c0a06 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -93,13 +93,10 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val result = findJobId(s.stageId) - var jobHeader = s"[There is no job about this stage]" - if (result.isDefined) { - val jobId = result.get - jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + - s"/ ${liveJobs.get(jobId).numStages}) Stages] " - } + val jobHeader = findJobId(s.stageId).map(jobId => + s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + + s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse( + "[There is no job about this stage] ") val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" val w = width - header.length - tailer.length diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 1803094edc8..29644f9f4c7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -24,10 +24,10 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { } class SparkStageInfo(val stageId: Int, val numTasks: Int) { - var numActiveTasks = new AtomicInteger(0) - var numCompleteTasks = new AtomicInteger(0) + val numActiveTasks = new AtomicInteger(0) + val numCompleteTasks = new AtomicInteger(0) } -class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { - var numCompleteStages = new AtomicInteger(0) +class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) { + val numCompleteStages = new AtomicInteger(0) } From d12046dacdc0895ead816970aa36e4d515c4b1a5 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Mon, 16 Oct 2023 16:24:38 +0800 Subject: [PATCH 21/35] add showProgress with jobInfo Unit Test --- .../kyuubi/SQLOperationListenerSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index f3bf6f311ec..f732f7c3846 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -35,8 +35,8 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp override protected def jdbcUrl: String = getJdbcUrl - test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" + test("operation listener") { + val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -48,13 +48,17 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + assert(logs.exists(_.contains("started with 2 stages"))) + assert(logs.exists(_.contains("started with 1 tasks"))) + assert(logs.exists(_.contains("started with 3 tasks"))) + assert(logs.exists(_.contains("Finished stage:"))) + assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) } } } - test("operation listener") { - val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" + test("operation listener with progress job info") { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -66,11 +70,7 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.contains("started with 2 stages"))) - assert(logs.exists(_.contains("started with 1 tasks"))) - assert(logs.exists(_.contains("started with 3 tasks"))) - assert(logs.exists(_.contains("Finished stage:"))) - assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) } } } From a991b68c4e345e8c6610be45835f13af22cce8b6 Mon Sep 17 00:00:00 2001 From: david yuan <51512358+davidyuan1223@users.noreply.github.com> Date: Tue, 17 Oct 2023 18:58:17 +0800 Subject: [PATCH 22/35] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala Co-authored-by: cxzl25 --- .../src/main/scala/org/apache/spark/kyuubi/StageStatus.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 1803094edc8..9c77b3a3068 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -24,8 +24,8 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { } class SparkStageInfo(val stageId: Int, val numTasks: Int) { - var numActiveTasks = new AtomicInteger(0) - var numCompleteTasks = new AtomicInteger(0) + val numActiveTasks = new AtomicInteger(0) + val numCompleteTasks = new AtomicInteger(0) } class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { From a08799ca0f11187ef0a55663e6fde5f232353117 Mon Sep 17 00:00:00 2001 From: david yuan <51512358+davidyuan1223@users.noreply.github.com> Date: Tue, 17 Oct 2023 18:58:28 +0800 Subject: [PATCH 23/35] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala Co-authored-by: cxzl25 --- .../src/main/scala/org/apache/spark/kyuubi/StageStatus.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 9c77b3a3068..29644f9f4c7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -28,6 +28,6 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { val numCompleteTasks = new AtomicInteger(0) } -class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { - var numCompleteStages = new AtomicInteger(0) +class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) { + val numCompleteStages = new AtomicInteger(0) } From 055e0ac96c5ec59be9a8debf669c70fda849b5f2 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 17 Oct 2023 19:00:51 +0800 Subject: [PATCH 24/35] add showProgress with jobInfo Unit Test --- .../src/main/scala/org/apache/spark/kyuubi/StageStatus.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index ba67414bc49..29644f9f4c7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -31,4 +31,3 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) { val numCompleteStages = new AtomicInteger(0) } - From 228fd9cf31463deba7c668150c222bdfd6bc8a19 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 17 Oct 2023 19:07:16 +0800 Subject: [PATCH 25/35] add showProgress with jobInfo Unit Test --- .../spark/kyuubi/SQLOperationListener.scala | 15 +++++++++------ .../spark/kyuubi/SparkConsoleProgressBar.scala | 11 ++++------- .../org/apache/spark/kyuubi/StageStatus.scala | 8 ++++---- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index f22449d8fa4..6adc3fdb4ba 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -85,7 +85,7 @@ class SQLOperationListener( override def onJobStart(jobStart: SparkListenerJobStart): Unit = { if (sameGroupId(jobStart.properties)) { val jobId = jobStart.jobId - val stageIds = jobStart.stageInfos.map(_.stageId) + val stageIds = jobStart.stageInfos.map(_.stageId).toSet val stageSize = jobStart.stageInfos.size if (executionId.isEmpty) { executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) @@ -144,11 +144,14 @@ class SQLOperationListener( val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { - activeJobs.asScala.foreach(item => { - if (item._2.stageIds.contains(stageId)) { - item._2.numCompleteStages.getAndIncrement() - } - }) + stageInfo.getStatusString match { + case "succeeded" => + activeJobs.asScala.foreach(item => { + if (item._2.stageIds.contains(stageId)) { + item._2.numCompleteStages.getAndIncrement() + } + }) + } withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 195f1418c13..023641c0a06 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -93,13 +93,10 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val result = findJobId(s.stageId) - var jobHeader = s"[There is no job about this stage]" - if (result.isDefined) { - val jobId = result.get - jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + - s"/ ${liveJobs.get(jobId).numStages}) Stages] " - } + val jobHeader = findJobId(s.stageId).map(jobId => + s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + + s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse( + "[There is no job about this stage] ") val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" val w = width - header.length - tailer.length diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 1803094edc8..29644f9f4c7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -24,10 +24,10 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { } class SparkStageInfo(val stageId: Int, val numTasks: Int) { - var numActiveTasks = new AtomicInteger(0) - var numCompleteTasks = new AtomicInteger(0) + val numActiveTasks = new AtomicInteger(0) + val numCompleteTasks = new AtomicInteger(0) } -class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { - var numCompleteStages = new AtomicInteger(0) +class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) { + val numCompleteStages = new AtomicInteger(0) } From 84b1aa0052b253f863b8b880cdd06721f2de9cd2 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 17 Oct 2023 19:56:55 +0800 Subject: [PATCH 26/35] Revert "improvement_add_job_log fix" This reverts commit 5b4aaa8b5265aedafa145cd4f8c4886ca3f2d980. --- .../kyuubi/SparkConsoleProgressBar.scala | 145 ------------------ .../org/apache/spark/kyuubi/StageStatus.scala | 33 ---- 2 files changed, 178 deletions(-) delete mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala delete mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala deleted file mode 100644 index 023641c0a06..00000000000 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kyuubi - -import java.time.{Instant, ZoneId} -import java.time.format.DateTimeFormatter -import java.util.{Locale, Timer, TimerTask} -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ - -import org.apache.kyuubi.Logging -import org.apache.kyuubi.operation.Operation - -class SparkConsoleProgressBar( - operation: Operation, - liveJobs: ConcurrentHashMap[Int, SparkJobInfo], - liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], - updatePeriodMSec: Long, - timeFormat: String) - extends Logging { - // Carriage return - private val CR = '\r' - // Delay to show up a progress bar, in milliseconds - private val firstDelayMSec = 500L - - // The width of terminal - private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt - - private var lastUpdateTime = 0L - private var lastProgressBar = "" - - val dtFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(timeFormat) - .withLocale(Locale.getDefault).withZone(ZoneId.systemDefault) - - // Schedule a refresh thread to run periodically - private val timer = new Timer( - "refresh progress for " + - operation.getHandle.identifier.toString, - true) - timer.schedule( - new TimerTask { - override def run(): Unit = { - refresh() - } - }, - firstDelayMSec, - updatePeriodMSec) - - /** - * Try to refresh the progress bar in every cycle - */ - private def refresh(): Unit = { - val now = System.currentTimeMillis() - val stages = liveStages.values.asScala.toList.sortBy(_.stageId) - if (stages.nonEmpty) { - show(now, stages.take(3)) // display at most 3 stages in same time - } - } - - /** - * Use stageId to find stage's jobId - * @param stageId stageId - * @return jobId,if not exists, return -1 - */ - private def findJobId(stageId: Int): Option[Int] = { - liveJobs.asScala.collectFirst { - case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId - } - } - - /** - * Show progress bar in console. The progress bar is displayed in the next line - * after your last output, keeps overwriting itself to hold in one line. The logging will follow - * the progress bar, then progress bar will be showed in next line without overwrite logs. - */ - private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = { - val width = TerminalWidth / stages.size - val bar = stages.map { s => - val total = s.numTasks - val jobHeader = findJobId(s.stageId).map(jobId => - s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + - s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse( - "[There is no job about this stage] ") - val header = jobHeader + s"[Stage ${s.stageId}:" - val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" - val w = width - header.length - tailer.length - val bar = - if (w > 0) { - val percent = w * s.numCompleteTasks.get / total - (0 until w).map { i => - if (i < percent) "=" else if (i == percent) ">" else " " - }.mkString("") - } else { - "" - } - header + bar + tailer - }.mkString("") - - // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed - // after idle some time) - if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { - operation.getOperationLog.foreach(log => { - log.write(dtFormatter.format(Instant.ofEpochMilli(now)) + ' ' + bar + CR) - }) - lastUpdateTime = now - } - lastProgressBar = bar - } - - /** - * Clear the progress bar if showed. - */ - private def clear(): Unit = { - if (lastProgressBar.nonEmpty) { - operation.getOperationLog.foreach(log => { - log.write(" " * TerminalWidth + CR) - }) - lastProgressBar = "" - } - } - - /** - * Mark all the stages as finished, clear the progress bar if showed - */ - def finish(): Unit = synchronized { - clear() - timer.cancel() - } -} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala deleted file mode 100644 index 29644f9f4c7..00000000000 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.kyuubi - -import java.util.concurrent.atomic.AtomicInteger - -case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { - override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" -} - -class SparkStageInfo(val stageId: Int, val numTasks: Int) { - val numActiveTasks = new AtomicInteger(0) - val numCompleteTasks = new AtomicInteger(0) -} - -class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) { - val numCompleteStages = new AtomicInteger(0) -} From d4434a0dee3f3f495d9f99f3754454536369e3f1 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 17 Oct 2023 19:59:04 +0800 Subject: [PATCH 27/35] Revert "add showProgress with jobInfo Unit Test" This reverts commit d12046dacdc0895ead816970aa36e4d515c4b1a5. --- .../kyuubi/SQLOperationListenerSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index f732f7c3846..f3bf6f311ec 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -35,8 +35,8 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp override protected def jdbcUrl: String = getJdbcUrl - test("operation listener") { - val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" + test("operation listener with progress job info") { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -48,17 +48,13 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.contains("started with 2 stages"))) - assert(logs.exists(_.contains("started with 1 tasks"))) - assert(logs.exists(_.contains("started with 3 tasks"))) - assert(logs.exists(_.contains("Finished stage:"))) - assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) } } } - test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" + test("operation listener") { + val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -70,7 +66,11 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + assert(logs.exists(_.contains("started with 2 stages"))) + assert(logs.exists(_.contains("started with 1 tasks"))) + assert(logs.exists(_.contains("started with 3 tasks"))) + assert(logs.exists(_.contains("Finished stage:"))) + assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) } } } From 0c9ac27b58773eafe5bb159862dca164e3b606ae Mon Sep 17 00:00:00 2001 From: davidyuan Date: Tue, 17 Oct 2023 20:01:28 +0800 Subject: [PATCH 28/35] add showProgress with jobInfo Unit Test --- .../org/apache/spark/kyuubi/SQLOperationListenerSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index f3bf6f311ec..87bc263c54f 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -48,7 +48,11 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) + assert(logs.exists(_.contains("started with 2 stages"))) + assert(logs.exists(_.contains("started with 1 tasks"))) + assert(logs.exists(_.contains("started with 3 tasks"))) + assert(logs.exists(_.contains("Finished stage:"))) + assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) } } } From 4f657e728c7508bce71dba203a909be8e1debc9f Mon Sep 17 00:00:00 2001 From: davidyuan Date: Wed, 18 Oct 2023 10:52:19 +0800 Subject: [PATCH 29/35] fix deleted files --- .../kyuubi/SparkConsoleProgressBar.scala | 145 ++++++++++++++++++ .../org/apache/spark/kyuubi/StageStatus.scala | 33 ++++ 2 files changed, 178 insertions(+) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala new file mode 100644 index 00000000000..023641c0a06 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi + +import java.time.{Instant, ZoneId} +import java.time.format.DateTimeFormatter +import java.util.{Locale, Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.operation.Operation + +class SparkConsoleProgressBar( + operation: Operation, + liveJobs: ConcurrentHashMap[Int, SparkJobInfo], + liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], + updatePeriodMSec: Long, + timeFormat: String) + extends Logging { + // Carriage return + private val CR = '\r' + // Delay to show up a progress bar, in milliseconds + private val firstDelayMSec = 500L + + // The width of terminal + private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt + + private var lastUpdateTime = 0L + private var lastProgressBar = "" + + val dtFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(timeFormat) + .withLocale(Locale.getDefault).withZone(ZoneId.systemDefault) + + // Schedule a refresh thread to run periodically + private val timer = new Timer( + "refresh progress for " + + operation.getHandle.identifier.toString, + true) + timer.schedule( + new TimerTask { + override def run(): Unit = { + refresh() + } + }, + firstDelayMSec, + updatePeriodMSec) + + /** + * Try to refresh the progress bar in every cycle + */ + private def refresh(): Unit = { + val now = System.currentTimeMillis() + val stages = liveStages.values.asScala.toList.sortBy(_.stageId) + if (stages.nonEmpty) { + show(now, stages.take(3)) // display at most 3 stages in same time + } + } + + /** + * Use stageId to find stage's jobId + * @param stageId stageId + * @return jobId,if not exists, return -1 + */ + private def findJobId(stageId: Int): Option[Int] = { + liveJobs.asScala.collectFirst { + case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId + } + } + + /** + * Show progress bar in console. The progress bar is displayed in the next line + * after your last output, keeps overwriting itself to hold in one line. The logging will follow + * the progress bar, then progress bar will be showed in next line without overwrite logs. + */ + private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = { + val width = TerminalWidth / stages.size + val bar = stages.map { s => + val total = s.numTasks + val jobHeader = findJobId(s.stageId).map(jobId => + s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + + s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse( + "[There is no job about this stage] ") + val header = jobHeader + s"[Stage ${s.stageId}:" + val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" + val w = width - header.length - tailer.length + val bar = + if (w > 0) { + val percent = w * s.numCompleteTasks.get / total + (0 until w).map { i => + if (i < percent) "=" else if (i == percent) ">" else " " + }.mkString("") + } else { + "" + } + header + bar + tailer + }.mkString("") + + // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed + // after idle some time) + if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { + operation.getOperationLog.foreach(log => { + log.write(dtFormatter.format(Instant.ofEpochMilli(now)) + ' ' + bar + CR) + }) + lastUpdateTime = now + } + lastProgressBar = bar + } + + /** + * Clear the progress bar if showed. + */ + private def clear(): Unit = { + if (lastProgressBar.nonEmpty) { + operation.getOperationLog.foreach(log => { + log.write(" " * TerminalWidth + CR) + }) + lastProgressBar = "" + } + } + + /** + * Mark all the stages as finished, clear the progress bar if showed + */ + def finish(): Unit = synchronized { + clear() + timer.cancel() + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala new file mode 100644 index 00000000000..29644f9f4c7 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi + +import java.util.concurrent.atomic.AtomicInteger + +case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +class SparkStageInfo(val stageId: Int, val numTasks: Int) { + val numActiveTasks = new AtomicInteger(0) + val numCompleteTasks = new AtomicInteger(0) +} + +class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) { + val numCompleteStages = new AtomicInteger(0) +} From 39751bffaa041531f0faabe5dc4bd6e4f16dd5e3 Mon Sep 17 00:00:00 2001 From: davidyuan Date: Wed, 18 Oct 2023 10:54:36 +0800 Subject: [PATCH 30/35] fix --- .../spark/kyuubi/SQLOperationListenerSuite.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index 87bc263c54f..f732f7c3846 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -35,8 +35,8 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp override protected def jdbcUrl: String = getJdbcUrl - test("operation listener with progress job info") { - val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" + test("operation listener") { + val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -57,8 +57,8 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp } } - test("operation listener") { - val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);" + test("operation listener with progress job info") { + val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);" withSessionHandle { (client, handle) => val req = new TExecuteStatementReq() req.setSessionHandle(handle) @@ -70,11 +70,7 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala - assert(logs.exists(_.contains("started with 2 stages"))) - assert(logs.exists(_.contains("started with 1 tasks"))) - assert(logs.exists(_.contains("started with 3 tasks"))) - assert(logs.exists(_.contains("Finished stage:"))) - assert(logs.exists(_.contains(s"Job ${0 + initJobId} succeeded"))) + assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]"))) } } } From 8c04dca7dbea097f772b0b244fe9a59dea094a4a Mon Sep 17 00:00:00 2001 From: david yuan <51512358+davidyuan1223@users.noreply.github.com> Date: Wed, 18 Oct 2023 14:55:27 +0800 Subject: [PATCH 31/35] Update SQLOperationListener.scala --- .../scala/org/apache/spark/kyuubi/SQLOperationListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 6adc3fdb4ba..91d91022f7a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -20,7 +20,7 @@ package org.apache.spark.kyuubi import java.util.Properties import java.util.concurrent.ConcurrentHashMap -import scala.jdk.CollectionConverters._ +import scala.collection.JavaConverters._ import org.apache.spark.scheduler._ import org.apache.spark.sql.SparkSession From 9e46356537334145817c2a2a845c9f6489382de5 Mon Sep 17 00:00:00 2001 From: david yuan <51512358+davidyuan1223@users.noreply.github.com> Date: Wed, 18 Oct 2023 15:22:43 +0800 Subject: [PATCH 32/35] Update SparkConsoleProgressBar.scala update desc about findJob --- .../org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 023641c0a06..d7a95d8a724 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -75,8 +75,8 @@ class SparkConsoleProgressBar( /** * Use stageId to find stage's jobId - * @param stageId stageId - * @return jobId,if not exists, return -1 + * @param stageId + * @return Optional jobId */ private def findJobId(stageId: Int): Option[Int] = { liveJobs.asScala.collectFirst { From 963ff18b9df251155d6329839b5203791da511a6 Mon Sep 17 00:00:00 2001 From: david yuan <51512358+davidyuan1223@users.noreply.github.com> Date: Wed, 18 Oct 2023 15:24:38 +0800 Subject: [PATCH 33/35] Update SparkConsoleProgressBar.scala --- .../scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index d7a95d8a724..55908795c38 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -76,7 +76,7 @@ class SparkConsoleProgressBar( /** * Use stageId to find stage's jobId * @param stageId - * @return Optional jobId + * @return jobId (Optional) */ private def findJobId(stageId: Int): Option[Int] = { liveJobs.asScala.collectFirst { From a06e9a17c8889941703dc9bf9abf66808bc3adec Mon Sep 17 00:00:00 2001 From: david yuan <51512358+davidyuan1223@users.noreply.github.com> Date: Tue, 24 Oct 2023 11:32:51 +0800 Subject: [PATCH 34/35] Update SparkConsoleProgressBar.scala --- .../scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index 55908795c38..feb0d16a121 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -99,7 +99,7 @@ class SparkConsoleProgressBar( "[There is no job about this stage] ") val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" - val w = width - header.length - tailer.length + val w = width + jobHeader.length - header.length - tailer.length val bar = if (w > 0) { val percent = w * s.numCompleteTasks.get / total From d8d03c4c036151a4987a5f85f5083e54b1fe836a Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 25 Oct 2023 17:57:09 +0800 Subject: [PATCH 35/35] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala --- .../org/apache/spark/kyuubi/SQLOperationListener.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 91d91022f7a..686cb1f359b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -146,11 +146,11 @@ class SQLOperationListener( if (activeStages.remove(stageAttempt) != null) { stageInfo.getStatusString match { case "succeeded" => - activeJobs.asScala.foreach(item => { - if (item._2.stageIds.contains(stageId)) { - item._2.numCompleteStages.getAndIncrement() + activeJobs.asScala.foreach { case (_, jobInfo) => + if (jobInfo.stageIds.contains(stageId)) { + jobInfo.numCompleteStages.getAndIncrement() } - }) + } } withOperationLog(super.onStageCompleted(stageCompleted)) }