From 13eb42c5d270205180140e4882bee1c5fb5080d4 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 15 Mar 2022 11:21:46 +0800 Subject: [PATCH 01/10] Use a disk-based store to save more information in live UI to help debug --- .../apache/spark/internal/config/Status.scala | 7 ++ .../apache/spark/status/AppStatusStore.scala | 27 ++++- .../sql/diagnostic/DiagnosticListener.scala | 103 ++++++++++++++++++ .../sql/diagnostic/DiagnosticStore.scala | 66 +++++++++++ .../spark/sql/execution/QueryExecution.scala | 6 +- .../spark/sql/execution/SQLExecution.scala | 6 +- .../spark/sql/execution/ui/SQLListener.scala | 6 +- .../spark/sql/internal/SharedState.scala | 7 ++ .../api/v1/sql/ApiSqlRootResource.scala | 11 ++ .../api/v1/sql/DiagnosticResource.scala | 67 ++++++++++++ .../apache/spark/status/api/v1/sql/api.scala | 10 ++ 11 files changed, 308 insertions(+), 8 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 669fa07053ca..cf958d433022 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -70,4 +70,11 @@ private[spark] object Status { .version("3.0.0") .booleanConf .createWithDefault(false) + + val DISK_STORE_DIR_FOR_STATUS = + ConfigBuilder("spark.appStatusStore.diskStore.dir") + .doc("Local directory where to store app status that couldn't fit in memory") + .version("3.3.0") + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 03767ee83a95..eb33abcd983f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -17,12 +17,17 @@ package org.apache.spark.status +import java.io.File +import java.nio.file.Files import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID import org.apache.spark.ui.scope._ @@ -34,6 +39,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} */ private[spark] class AppStatusStore( val store: KVStore, + val diskStore: Option[KVStore] = None, val listener: Option[AppStatusListener] = None) { def applicationInfo(): v1.ApplicationInfo = { @@ -755,18 +761,33 @@ private[spark] class AppStatusStore( } } -private[spark] object AppStatusStore { +private[spark] object AppStatusStore extends Logging { val CURRENT_VERSION = 2L /** - * Create an in-memory store for a live application. + * Create an in-memory store for a live application. also create a disk store if + * the `spark.appStatusStore.diskStore.dir` is set */ def createLiveStore( conf: SparkConf, appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { val store = new ElementTrackingStore(new InMemoryStore(), conf) val listener = new AppStatusListener(store, conf, true, appStatusSource) - new AppStatusStore(store, listener = Some(listener)) + // create a disk-based kv store if the directory is set + val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir => + val storePath = Files.createDirectories( + new File(storeDir, System.currentTimeMillis().toString).toPath + ).toFile + try { + Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), conf)) + .map(new ElementTrackingStore(_, conf)) + } catch { + case NonFatal(e) => + logWarning("Failed to create disk-based app status store: ", e) + None + } + } + new AppStatusStore(store, diskStore = diskStore, listener = Some(listener)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala new file mode 100644 index 000000000000..87723cdbf651 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.diagnostic + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.execution.ExplainMode +import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS +import org.apache.spark.status.{ElementTrackingStore, KVUtils} + +class DiagnosticListener( + conf: SparkConf, + kvStore: ElementTrackingStore) extends SparkListener { + + kvStore.addTrigger( + classOf[ExecutionDiagnosticData], + conf.get(UI_RETAINED_EXECUTIONS)) { count => + cleanupExecutions(count) + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) + case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e) + case _ => // Ignore + } + + private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { + val data = new AdaptiveExecutionUpdate( + event.executionId, + System.currentTimeMillis(), + event.physicalPlanDescription + ) + kvStore.write(data) + } + + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + val planDescriptionMode = ExplainMode.fromString(SQLConf.get.uiExplainMode) + val physicalPlan = event.qe.explainString(planDescriptionMode, Int.MaxValue) + val data = new ExecutionDiagnosticData( + event.executionId, + physicalPlan, + event.time, + None, + None + ) + // Check triggers since it's adding new netries + kvStore.write(data, checkTriggers = true) + } + + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + try { + val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId) + val planDescriptionMode = ExplainMode.fromString(SQLConf.get.uiExplainMode) + val physicalPlan = event.qe.explainString(planDescriptionMode, Int.MaxValue) + val data = new ExecutionDiagnosticData( + event.executionId, + physicalPlan, + existing.submissionTime, + Some(event.time), + event.executionFailure.map( + e => s"${e.getClass.getCanonicalName}: ${e.getMessage}").orElse(Some("")) + ) + kvStore.write(data) + } catch { + case _: NoSuchElementException => + // this is possibly caused by the query failed before execution. + } + } + + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) + if (countToDelete <= 0) { + return + } + val view = kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined) + toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData], e.executionId)) + kvStore.removeAllByIndexValues( + classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId)) + } +} + +object DiagnosticListener { + val QUEUE_NAME = "diagnostics" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala new file mode 100644 index 000000000000..a6f365adb171 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.diagnostic + +import scala.collection.JavaConverters._ + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.{KVIndex, KVStore} + +class DiagnosticStore(store: KVStore) { + + def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = { + store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq + } + + def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = { + try { + Some(store.read(classOf[ExecutionDiagnosticData], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def adaptiveExecutionUpdates(executionId: Long): Seq[AdaptiveExecutionUpdate] = { + store.view(classOf[AdaptiveExecutionUpdate]) + .index("updateTime") + .parent(executionId) + .asScala + .toSeq + } +} + +class ExecutionDiagnosticData( + @KVIndexParam val executionId: Long, + val physicalPlan: String, + val submissionTime: Long, + val completionTime: Option[Long], + val errorMessage: Option[String]) + +class AdaptiveExecutionUpdate( + @KVIndexParam("id") + val executionId: Long, + @KVIndexParam(value = "updateTime", parent = "id") + val updateTime: Long, + val physicalPlan: String) { + + @JsonIgnore @KVIndex + private def naturalIndex: Array[Long] = Array(executionId, updateTime) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9bf8de5ea6c4..4b74a96702c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -223,9 +223,11 @@ class QueryExecution( append("\n") } - def explainString(mode: ExplainMode): String = { + def explainString( + mode: ExplainMode, + maxFields: Int = SQLConf.get.maxToStringFields): String = { val concat = new PlanStringConcat() - explainString(mode, SQLConf.get.maxToStringFields, concat.append) + explainString(mode, maxFields, concat.append) withRedaction { concat.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 748f75b18626..953c370297f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -96,7 +96,7 @@ object SQLExecution { var ex: Option[Throwable] = None val startTime = System.nanoTime() try { - sc.listenerBus.post(SparkListenerSQLExecutionStart( + val event = SparkListenerSQLExecutionStart( executionId = executionId, description = desc, details = callSite.longForm, @@ -105,7 +105,9 @@ object SQLExecution { // will be caught and reported in the `SparkListenerSQLExecutionEnd` sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), time = System.currentTimeMillis(), - redactedConfigs)) + redactedConfigs) + event.qe = queryExecution + sc.listenerBus.post(event) body } catch { case e: Throwable => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 26805e135b77..e3f51cbe3b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -49,7 +49,11 @@ case class SparkListenerSQLExecutionStart( sparkPlanInfo: SparkPlanInfo, time: Long, modifiedConfigs: Map[String, String] = Map.empty) - extends SparkListenerEvent + extends SparkListenerEvent { + + // The `QueryExecution` instance that represents the SQL execution + @JsonIgnore private[sql] var qe: QueryExecution = null +} @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index e894f39d9270..f6b748d24245 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.diagnostic.DiagnosticListener import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution @@ -118,6 +119,12 @@ private[sql] class SharedState( statusStore } + sparkContext.statusStore.diskStore.foreach { kvStore => + sparkContext.listenerBus.addToQueue( + new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]), + DiagnosticListener.QUEUE_NAME) + } + /** * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui * data to show. diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index 747c05b9b062..73efe25a2787 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -31,4 +31,15 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext { def sqlList( @PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource] + + @Path("applications/{appId}/diagnostics") + def diagnosticsList( + @PathParam("appId") appId: String): Class[DiagnosticResource] = + classOf[DiagnosticResource] + + @Path("applications/{appId}/{attemptId}/diagnostics") + def diagnosticsList( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): Class[DiagnosticResource] = + classOf[DiagnosticResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala new file mode 100644 index 000000000000..d00bad4c84a7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import java.util.Date +import javax.ws.rs._ + +import org.apache.spark.sql.diagnostic._ +import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} + +private[v1] class DiagnosticResource extends BaseAppResource { + + @GET + def diagnosticList( + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int): Seq[DiagnosticData] = { + withUI { ui => + ui.store.diskStore.map { kvStore => + val store = new DiagnosticStore(kvStore) + store.diagnosticsList(offset, length) + // Do not display the plan changes in the list + .map(d => prepareDiagnosticData(d, Seq.empty)) + }.getOrElse(Seq.empty) + } + } + + @GET + @Path("{executionId:\\d+}") + def diagnostic( + @PathParam("executionId") execId: Long): DiagnosticData = { + withUI { ui => + ui.store.diskStore.flatMap { kvStore => + val store = new DiagnosticStore(kvStore) + val updates = store.adaptiveExecutionUpdates(execId) + store.diagnostic(execId) + .map(d => prepareDiagnosticData(d, updates)) + }.getOrElse(throw new NotFoundException("unknown query execution id: " + execId)) + } + } + + private def prepareDiagnosticData( + diagnostic: ExecutionDiagnosticData, + updates: Seq[AdaptiveExecutionUpdate]): DiagnosticData = { + new DiagnosticData( + diagnostic.executionId, + diagnostic.physicalPlan, + new Date(diagnostic.submissionTime), + diagnostic.completionTime.map(t => new Date(t)), + diagnostic.errorMessage, + updates.map(u => AdaptivePlanChange(new Date(u.updateTime), u.physicalPlan))) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 0ddf66718bce..0236b4122edc 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -40,3 +40,13 @@ case class Node private[spark]( metrics: Seq[Metric]) case class Metric private[spark] (name: String, value: String) + +class DiagnosticData private[spark] ( + val id: Long, + val physicalPlan: String, + val submissionTime: Date, + val completionTime: Option[Date], + val errorMessage: Option[String], + val planChanges: Seq[AdaptivePlanChange]) + +case class AdaptivePlanChange(updateTime: Date, physicalPlan: String) From 947ed1735ab817e4e6fda74ecf33d39d6907d360 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 17 Mar 2022 11:28:56 +0800 Subject: [PATCH 02/10] Update Status.scala --- .../main/scala/org/apache/spark/internal/config/Status.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index cf958d433022..cdc123f39bbb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -73,7 +73,8 @@ private[spark] object Status { val DISK_STORE_DIR_FOR_STATUS = ConfigBuilder("spark.appStatusStore.diskStore.dir") - .doc("Local directory where to store app status that couldn't fit in memory") + .doc("Local directory where to store app status. " + + "It's an alternative to the in-memory kv store") .version("3.3.0") .stringConf .createOptional From 8692b01aaae8eb72c01d14e4c31dd25f958109fc Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 28 Mar 2022 11:12:02 +0800 Subject: [PATCH 03/10] address commens --- .../scala/org/apache/spark/internal/config/Status.scala | 2 +- docs/monitoring.md | 9 +++++++++ .../apache/spark/sql/diagnostic/DiagnosticStore.scala | 7 +++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index cdc123f39bbb..d31ba522bac1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -75,7 +75,7 @@ private[spark] object Status { ConfigBuilder("spark.appStatusStore.diskStore.dir") .doc("Local directory where to store app status. " + "It's an alternative to the in-memory kv store") - .version("3.3.0") + .version("3.4.0") .stringConf .createOptional } diff --git a/docs/monitoring.md b/docs/monitoring.md index f2c6e3797492..7eabb81d8aa2 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -611,6 +611,15 @@ can be identified by their `[attempt-id]`. In the API listed below, when running ?planDescription=[true (default) | false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high. + + /applications/[app-id]/diagnostics/[execution-id] + Diagnostic for the given query. it includes: +
+ 1. plan change history of adaptive execution +
+ 2. physical plan description with unlimited fields + + /applications/[app-id]/environment Environment details of the given application. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala index a6f365adb171..236ee104f0e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala @@ -24,6 +24,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.status.KVUtils.KVIndexParam import org.apache.spark.util.kvstore.{KVIndex, KVStore} +/** + * Provides a view of a KVStore with methods that make it easy to query diagnostic-specific + * information. There's no state kept in this class, so it's ok to have multiple instances + * of it in an application. + */ class DiagnosticStore(store: KVStore) { def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = { @@ -47,6 +52,7 @@ class DiagnosticStore(store: KVStore) { } } +/* Represents the diagnostic data of a SQL execution */ class ExecutionDiagnosticData( @KVIndexParam val executionId: Long, val physicalPlan: String, @@ -54,6 +60,7 @@ class ExecutionDiagnosticData( val completionTime: Option[Long], val errorMessage: Option[String]) +/* Represents the plan change of an adaptive execution */ class AdaptiveExecutionUpdate( @KVIndexParam("id") val executionId: Long, From 1ad98df152a2ce42b226b565b101877080322c88 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 31 Mar 2022 11:45:44 +0800 Subject: [PATCH 04/10] address comments --- docs/monitoring.md | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++ .../sql/diagnostic/DiagnosticListener.scala | 3 ++- .../api/v1/sql/ApiSqlRootResource.scala | 16 +++++++-------- ...urce.scala => SQLDiagnosticResource.scala} | 20 +++++++++---------- .../apache/spark/status/api/v1/sql/api.scala | 2 +- 6 files changed, 33 insertions(+), 21 deletions(-) rename sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/{DiagnosticResource.scala => SQLDiagnosticResource.scala} (81%) diff --git a/docs/monitoring.md b/docs/monitoring.md index 7eabb81d8aa2..6d1bd2eefccb 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -612,7 +612,7 @@ can be identified by their `[attempt-id]`. In the API listed below, when running - /applications/[app-id]/diagnostics/[execution-id] + /applications/[app-id]/diagnostics/sql/[execution-id] Diagnostic for the given query. it includes:
1. plan change history of adaptive execution diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 910f2db5e347..6f248f265c10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3174,6 +3174,15 @@ object SQLConf { .intConf .createWithDefault(25) + val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC = + buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic") + .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " + + s"output will be stored for the diagnostics API. The output will be stored in " + + s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS}") + .version("3.4.0") + .intConf + .createWithDefault(10000) + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, further output will be truncated. The default setting always generates a full " + @@ -4412,6 +4421,8 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + def maxToStringFieldsForDiagnostic: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC) + def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala index 87723cdbf651..69fda6681703 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala @@ -69,7 +69,8 @@ class DiagnosticListener( try { val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId) val planDescriptionMode = ExplainMode.fromString(SQLConf.get.uiExplainMode) - val physicalPlan = event.qe.explainString(planDescriptionMode, Int.MaxValue) + val physicalPlan = event.qe.explainString( + planDescriptionMode, SQLConf.get.maxToStringFieldsForDiagnostic) val data = new ExecutionDiagnosticData( event.executionId, physicalPlan, diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index 73efe25a2787..6c727c4369d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -32,14 +32,14 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext { @PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource] - @Path("applications/{appId}/diagnostics") - def diagnosticsList( - @PathParam("appId") appId: String): Class[DiagnosticResource] = - classOf[DiagnosticResource] + @Path("applications/{appId}/diagnostics/sql") + def sqlDiagnosticsList( + @PathParam("appId") appId: String): Class[SQLDiagnosticResource] = + classOf[SQLDiagnosticResource] - @Path("applications/{appId}/{attemptId}/diagnostics") - def diagnosticsList( + @Path("applications/{appId}/{attemptId}/diagnostics/sql") + def sqlDiagnosticsList( @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): Class[DiagnosticResource] = - classOf[DiagnosticResource] + @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] = + classOf[SQLDiagnosticResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala similarity index 81% rename from sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala rename to sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala index d00bad4c84a7..8a6c81ced741 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/DiagnosticResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala @@ -23,40 +23,40 @@ import javax.ws.rs._ import org.apache.spark.sql.diagnostic._ import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} -private[v1] class DiagnosticResource extends BaseAppResource { +private[v1] class SQLDiagnosticResource extends BaseAppResource { @GET - def diagnosticList( + def sqlDiagnosticList( @DefaultValue("0") @QueryParam("offset") offset: Int, - @DefaultValue("20") @QueryParam("length") length: Int): Seq[DiagnosticData] = { + @DefaultValue("20") @QueryParam("length") length: Int): Seq[SQLDiagnosticData] = { withUI { ui => ui.store.diskStore.map { kvStore => val store = new DiagnosticStore(kvStore) store.diagnosticsList(offset, length) // Do not display the plan changes in the list - .map(d => prepareDiagnosticData(d, Seq.empty)) + .map(d => prepareSqlDiagnosticData(d, Seq.empty)) }.getOrElse(Seq.empty) } } @GET @Path("{executionId:\\d+}") - def diagnostic( - @PathParam("executionId") execId: Long): DiagnosticData = { + def sqlDiagnostic( + @PathParam("executionId") execId: Long): SQLDiagnosticData = { withUI { ui => ui.store.diskStore.flatMap { kvStore => val store = new DiagnosticStore(kvStore) val updates = store.adaptiveExecutionUpdates(execId) store.diagnostic(execId) - .map(d => prepareDiagnosticData(d, updates)) + .map(d => prepareSqlDiagnosticData(d, updates)) }.getOrElse(throw new NotFoundException("unknown query execution id: " + execId)) } } - private def prepareDiagnosticData( + private def prepareSqlDiagnosticData( diagnostic: ExecutionDiagnosticData, - updates: Seq[AdaptiveExecutionUpdate]): DiagnosticData = { - new DiagnosticData( + updates: Seq[AdaptiveExecutionUpdate]): SQLDiagnosticData = { + new SQLDiagnosticData( diagnostic.executionId, diagnostic.physicalPlan, new Date(diagnostic.submissionTime), diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 0236b4122edc..3cafc10352f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -41,7 +41,7 @@ case class Node private[spark]( case class Metric private[spark] (name: String, value: String) -class DiagnosticData private[spark] ( +class SQLDiagnosticData private[spark] ( val id: Long, val physicalPlan: String, val submissionTime: Date, From 847e547d45fba78ce6e5f956aca1396e5bb02269 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 31 Mar 2022 12:51:40 +0800 Subject: [PATCH 05/10] fix --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/diagnostic/DiagnosticListener.scala | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6f248f265c10..3f9d961f1538 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3178,7 +3178,7 @@ object SQLConf { buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic") .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " + s"output will be stored for the diagnostics API. The output will be stored in " + - s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS}") + s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS.key}") .version("3.4.0") .intConf .createWithDefault(10000) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala index 69fda6681703..8ea1ff149646 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala @@ -52,8 +52,10 @@ class DiagnosticListener( } private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { - val planDescriptionMode = ExplainMode.fromString(SQLConf.get.uiExplainMode) - val physicalPlan = event.qe.explainString(planDescriptionMode, Int.MaxValue) + val sqlConf = event.qe.sparkSession.sessionState.conf + val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode) + val physicalPlan = event.qe.explainString( + planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic) val data = new ExecutionDiagnosticData( event.executionId, physicalPlan, @@ -68,9 +70,10 @@ class DiagnosticListener( private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { try { val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId) - val planDescriptionMode = ExplainMode.fromString(SQLConf.get.uiExplainMode) + val sqlConf = event.qe.sparkSession.sessionState.conf + val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode) val physicalPlan = event.qe.explainString( - planDescriptionMode, SQLConf.get.maxToStringFieldsForDiagnostic) + planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic) val data = new ExecutionDiagnosticData( event.executionId, physicalPlan, From b2efff6788fa055fb76ed661c09ccb382449e002 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 31 Mar 2022 18:45:23 +0800 Subject: [PATCH 06/10] fix style --- .../org/apache/spark/sql/diagnostic/DiagnosticListener.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala index 8ea1ff149646..5bf3580b6a98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala @@ -21,7 +21,6 @@ import org.apache.spark.SparkConf import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.execution.ExplainMode import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS import org.apache.spark.status.{ElementTrackingStore, KVUtils} From 49b102ea8227d44edebcb787c4ea198995651006 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 7 Apr 2022 12:10:06 +0800 Subject: [PATCH 07/10] address comment --- .../main/scala/org/apache/spark/internal/config/Status.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index d31ba522bac1..1db7267237ff 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -72,7 +72,7 @@ private[spark] object Status { .createWithDefault(false) val DISK_STORE_DIR_FOR_STATUS = - ConfigBuilder("spark.appStatusStore.diskStore.dir") + ConfigBuilder("spark.appStatusStore.diskStoreDir") .doc("Local directory where to store app status. " + "It's an alternative to the in-memory kv store") .version("3.4.0") From 699b6f2bb289d924d4c331672f60c6ac0f7b60fb Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 13 Apr 2022 09:53:00 +0800 Subject: [PATCH 08/10] add classdoc --- .../apache/spark/sql/diagnostic/DiagnosticListener.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala index 5bf3580b6a98..7ce1093e8796 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala @@ -24,6 +24,12 @@ import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdat import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS import org.apache.spark.status.{ElementTrackingStore, KVUtils} +/** + * A Spark listener that writes diagnostic information to a data store. The information can be + * accessed by the public REST API. + * + * @param kvStore used to store the diagnostic information + */ class DiagnosticListener( conf: SparkConf, kvStore: ElementTrackingStore) extends SparkListener { From e1454b930745dbde925a3c96ea3f81c46ef96cb9 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 13 Apr 2022 10:00:06 +0800 Subject: [PATCH 09/10] trigger test From 59797e5537d2b907cbb11d944b21a95901a6909c Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Wed, 13 Apr 2022 12:44:18 +0800 Subject: [PATCH 10/10] trigger test