Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
56b91a3
fix_4186
davidyuan1223 Oct 10, 2023
6209c34
[Improvement] spark showProgress can briefly output info of the job #…
davidyuan1223 Oct 10, 2023
d30492e
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 12, 2023
32ad075
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 12, 2023
4564ef9
improvement_add_job_log fix
davidyuan1223 Oct 12, 2023
e15fc71
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
249a422
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
5cf8714
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
49debfb
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
9fa8e73
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 16, 2023
d4bdec7
fix_4186
davidyuan1223 Oct 10, 2023
c07535a
[Improvement] spark showProgress can briefly output info of the job #…
davidyuan1223 Oct 10, 2023
af05089
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 12, 2023
59340b7
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 12, 2023
780f9d1
improvement_add_job_log fix
davidyuan1223 Oct 12, 2023
5b4aaa8
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
7b9e874
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
e8a5108
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
a973cdd
improvement_add_job_log fix
davidyuan1223 Oct 13, 2023
10a56b1
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 17, 2023
d12046d
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 16, 2023
a991b68
Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/sp…
davidyuan1223 Oct 17, 2023
a08799c
Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/sp…
davidyuan1223 Oct 17, 2023
b226ada
Merge remote-tracking branch 'origin/improvement_add_job_log' into im…
davidyuan1223 Oct 17, 2023
e4aac34
Merge remote-tracking branch 'origin/improvement_add_job_log' into im…
davidyuan1223 Oct 17, 2023
055e0ac
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 17, 2023
228fd9c
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 17, 2023
66126f9
Merge remote-tracking branch 'origin/improvement_add_job_log' into im…
davidyuan1223 Oct 17, 2023
84b1aa0
Revert "improvement_add_job_log fix"
davidyuan1223 Oct 17, 2023
d4434a0
Revert "add showProgress with jobInfo Unit Test"
davidyuan1223 Oct 17, 2023
0c9ac27
add showProgress with jobInfo Unit Test
davidyuan1223 Oct 17, 2023
86756eb
Merge branch 'apache:master' into improvement_add_job_log
davidyuan1223 Oct 17, 2023
4f657e7
fix deleted files
davidyuan1223 Oct 18, 2023
39751bf
fix
davidyuan1223 Oct 18, 2023
8c04dca
Update SQLOperationListener.scala
davidyuan1223 Oct 18, 2023
9e46356
Update SparkConsoleProgressBar.scala
davidyuan1223 Oct 18, 2023
963ff18
Update SparkConsoleProgressBar.scala
davidyuan1223 Oct 18, 2023
8544084
Merge branch 'apache:master' into improvement_add_job_log
davidyuan1223 Oct 24, 2023
a06e9a1
Update SparkConsoleProgressBar.scala
davidyuan1223 Oct 24, 2023
d8d03c4
Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/sp…
pan3793 Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.kyuubi
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.scheduler._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
Expand All @@ -44,7 +46,7 @@ class SQLOperationListener(
spark: SparkSession) extends StatsReportListener with Logging {

private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

Expand All @@ -53,6 +55,7 @@ class SQLOperationListener(
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
Expand All @@ -79,9 +82,10 @@ class SQLOperationListener(
}
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId).toSet
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
Expand All @@ -93,17 +97,19 @@ class SQLOperationListener(
case _ =>
}
}
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds))
withOperationLog {
activeJobs.add(jobId)
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
}
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val jobId = jobEnd.jobId
if (activeJobs.remove(jobId)) {
if (activeJobs.remove(jobId) != null) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
Expand Down Expand Up @@ -134,9 +140,18 @@ class SQLOperationListener(

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageId = stageInfo.stageId
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
stageInfo.getStatusString match {
case "succeeded" =>
activeJobs.asScala.foreach { case (_, jobInfo) =>
if (jobInfo.stageIds.contains(stageId)) {
jobInfo.numCompleteStages.getAndIncrement()
}
}
}
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation

class SparkConsoleProgressBar(
operation: Operation,
liveJobs: ConcurrentHashMap[Int, SparkJobInfo],
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
updatePeriodMSec: Long,
timeFormat: String)
Expand Down Expand Up @@ -72,6 +73,17 @@ class SparkConsoleProgressBar(
}
}

/**
* Use stageId to find stage's jobId
* @param stageId
* @return jobId (Optional)
*/
private def findJobId(stageId: Int): Option[Int] = {
liveJobs.asScala.collectFirst {
case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId
}
}

/**
* Show progress bar in console. The progress bar is displayed in the next line
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
Expand All @@ -81,9 +93,13 @@ class SparkConsoleProgressBar(
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val jobHeader = findJobId(s.stageId).map(jobId =>
s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " +
s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse(
"[There is no job about this stage] ")
val header = jobHeader + s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
val w = width - header.length - tailer.length
val w = width + jobHeader.length - header.length - tailer.length
val bar =
if (w > 0) {
val percent = w * s.numCompleteTasks.get / total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
}

class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
val numActiveTasks = new AtomicInteger(0)
val numCompleteTasks = new AtomicInteger(0)
}

class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) {
val numCompleteStages = new AtomicInteger(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import scala.collection.JavaConverters.asScalaBufferConverter
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TOperationHandle}
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_SPARK_LISTENER_ENABLED
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper

class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {

override def withKyuubiConf: Map[String, String] = Map.empty
override def withKyuubiConf: Map[String, String] = Map(
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")

override protected def jdbcUrl: String = getJdbcUrl

Expand All @@ -54,6 +57,24 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp
}
}

test("operation listener with progress job info") {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
}
}
}

test("SQLOperationListener configurable") {
val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);"
withSessionHandle { (client, handle) =>
Expand Down