diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
index 669fa07053ca..1db7267237ff 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
@@ -70,4 +70,12 @@ private[spark] object Status {
.version("3.0.0")
.booleanConf
.createWithDefault(false)
+
+ val DISK_STORE_DIR_FOR_STATUS =
+ ConfigBuilder("spark.appStatusStore.diskStoreDir")
+ .doc("Local directory where to store app status. " +
+ "It's an alternative to the in-memory kv store")
+ .version("3.4.0")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 34155e3e3309..b455850d6092 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -17,12 +17,17 @@
package org.apache.spark.status
+import java.io.File
+import java.nio.file.Files
import java.util.{List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
+import scala.util.control.NonFatal
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
import org.apache.spark.ui.scope._
@@ -34,6 +39,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
*/
private[spark] class AppStatusStore(
val store: KVStore,
+ val diskStore: Option[KVStore] = None,
val listener: Option[AppStatusListener] = None) {
def applicationInfo(): v1.ApplicationInfo = {
@@ -755,18 +761,33 @@ private[spark] class AppStatusStore(
}
}
-private[spark] object AppStatusStore {
+private[spark] object AppStatusStore extends Logging {
val CURRENT_VERSION = 2L
/**
- * Create an in-memory store for a live application.
+ * Create an in-memory store for a live application. also create a disk store if
+ * the `spark.appStatusStore.diskStore.dir` is set
*/
def createLiveStore(
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
- new AppStatusStore(store, listener = Some(listener))
+ // create a disk-based kv store if the directory is set
+ val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir =>
+ val storePath = Files.createDirectories(
+ new File(storeDir, System.currentTimeMillis().toString).toPath
+ ).toFile
+ try {
+ Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), conf))
+ .map(new ElementTrackingStore(_, conf))
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Failed to create disk-based app status store: ", e)
+ None
+ }
+ }
+ new AppStatusStore(store, diskStore = diskStore, listener = Some(listener))
}
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index f2c6e3797492..6d1bd2eefccb 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -611,6 +611,15 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
?planDescription=[true (default) | false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.
+
+ /applications/[app-id]/diagnostics/sql/[execution-id] |
+ Diagnostic for the given query. it includes:
+
+ 1. plan change history of adaptive execution
+
+ 2. physical plan description with unlimited fields
+ |
+
/applications/[app-id]/environment |
Environment details of the given application. |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 59ad40b512a7..e4d0e44740b7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3198,6 +3198,15 @@ object SQLConf {
.intConf
.createWithDefault(25)
+ val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC =
+ buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic")
+ .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " +
+ s"output will be stored for the diagnostics API. The output will be stored in " +
+ s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS.key}")
+ .version("3.4.0")
+ .intConf
+ .createWithDefault(10000)
+
val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
.doc("Maximum number of characters to output for a plan string. If the plan is " +
"longer, further output will be truncated. The default setting always generates a full " +
@@ -4439,6 +4448,8 @@ class SQLConf extends Serializable with Logging {
def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
+ def maxToStringFieldsForDiagnostic: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC)
+
def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
new file mode 100644
index 000000000000..7ce1093e8796
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.diagnostic
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.ExplainMode
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
+import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+
+/**
+ * A Spark listener that writes diagnostic information to a data store. The information can be
+ * accessed by the public REST API.
+ *
+ * @param kvStore used to store the diagnostic information
+ */
+class DiagnosticListener(
+ conf: SparkConf,
+ kvStore: ElementTrackingStore) extends SparkListener {
+
+ kvStore.addTrigger(
+ classOf[ExecutionDiagnosticData],
+ conf.get(UI_RETAINED_EXECUTIONS)) { count =>
+ cleanupExecutions(count)
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
+ case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
+ case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e)
+ case _ => // Ignore
+ }
+
+ private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = {
+ val data = new AdaptiveExecutionUpdate(
+ event.executionId,
+ System.currentTimeMillis(),
+ event.physicalPlanDescription
+ )
+ kvStore.write(data)
+ }
+
+ private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
+ val sqlConf = event.qe.sparkSession.sessionState.conf
+ val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
+ val physicalPlan = event.qe.explainString(
+ planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
+ val data = new ExecutionDiagnosticData(
+ event.executionId,
+ physicalPlan,
+ event.time,
+ None,
+ None
+ )
+ // Check triggers since it's adding new netries
+ kvStore.write(data, checkTriggers = true)
+ }
+
+ private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+ try {
+ val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId)
+ val sqlConf = event.qe.sparkSession.sessionState.conf
+ val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
+ val physicalPlan = event.qe.explainString(
+ planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
+ val data = new ExecutionDiagnosticData(
+ event.executionId,
+ physicalPlan,
+ existing.submissionTime,
+ Some(event.time),
+ event.executionFailure.map(
+ e => s"${e.getClass.getCanonicalName}: ${e.getMessage}").orElse(Some(""))
+ )
+ kvStore.write(data)
+ } catch {
+ case _: NoSuchElementException =>
+ // this is possibly caused by the query failed before execution.
+ }
+ }
+
+ private def cleanupExecutions(count: Long): Unit = {
+ val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
+ if (countToDelete <= 0) {
+ return
+ }
+ val view = kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
+ toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData], e.executionId))
+ kvStore.removeAllByIndexValues(
+ classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId))
+ }
+}
+
+object DiagnosticListener {
+ val QUEUE_NAME = "diagnostics"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
new file mode 100644
index 000000000000..236ee104f0e9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.diagnostic
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query diagnostic-specific
+ * information. There's no state kept in this class, so it's ok to have multiple instances
+ * of it in an application.
+ */
+class DiagnosticStore(store: KVStore) {
+
+ def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = {
+ store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq
+ }
+
+ def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = {
+ try {
+ Some(store.read(classOf[ExecutionDiagnosticData], executionId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
+
+ def adaptiveExecutionUpdates(executionId: Long): Seq[AdaptiveExecutionUpdate] = {
+ store.view(classOf[AdaptiveExecutionUpdate])
+ .index("updateTime")
+ .parent(executionId)
+ .asScala
+ .toSeq
+ }
+}
+
+/* Represents the diagnostic data of a SQL execution */
+class ExecutionDiagnosticData(
+ @KVIndexParam val executionId: Long,
+ val physicalPlan: String,
+ val submissionTime: Long,
+ val completionTime: Option[Long],
+ val errorMessage: Option[String])
+
+/* Represents the plan change of an adaptive execution */
+class AdaptiveExecutionUpdate(
+ @KVIndexParam("id")
+ val executionId: Long,
+ @KVIndexParam(value = "updateTime", parent = "id")
+ val updateTime: Long,
+ val physicalPlan: String) {
+
+ @JsonIgnore @KVIndex
+ private def naturalIndex: Array[Long] = Array(executionId, updateTime)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9bf8de5ea6c4..4b74a96702c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -223,9 +223,11 @@ class QueryExecution(
append("\n")
}
- def explainString(mode: ExplainMode): String = {
+ def explainString(
+ mode: ExplainMode,
+ maxFields: Int = SQLConf.get.maxToStringFields): String = {
val concat = new PlanStringConcat()
- explainString(mode, SQLConf.get.maxToStringFields, concat.append)
+ explainString(mode, maxFields, concat.append)
withRedaction {
concat.toString
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 748f75b18626..953c370297f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -96,7 +96,7 @@ object SQLExecution {
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
- sc.listenerBus.post(SparkListenerSQLExecutionStart(
+ val event = SparkListenerSQLExecutionStart(
executionId = executionId,
description = desc,
details = callSite.longForm,
@@ -105,7 +105,9 @@ object SQLExecution {
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
time = System.currentTimeMillis(),
- redactedConfigs))
+ redactedConfigs)
+ event.qe = queryExecution
+ sc.listenerBus.post(event)
body
} catch {
case e: Throwable =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 26805e135b77..e3f51cbe3b00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -49,7 +49,11 @@ case class SparkListenerSQLExecutionStart(
sparkPlanInfo: SparkPlanInfo,
time: Long,
modifiedConfigs: Map[String, String] = Map.empty)
- extends SparkListenerEvent
+ extends SparkListenerEvent {
+
+ // The `QueryExecution` instance that represents the SQL execution
+ @JsonIgnore private[sql] var qe: QueryExecution = null
+}
@DeveloperApi
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index e894f39d9270..f6b748d24245 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.diagnostic.DiagnosticListener
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
@@ -118,6 +119,12 @@ private[sql] class SharedState(
statusStore
}
+ sparkContext.statusStore.diskStore.foreach { kvStore =>
+ sparkContext.listenerBus.addToQueue(
+ new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]),
+ DiagnosticListener.QUEUE_NAME)
+ }
+
/**
* A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
* data to show.
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
index 747c05b9b062..6c727c4369d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
@@ -31,4 +31,15 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext {
def sqlList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource]
+
+ @Path("applications/{appId}/diagnostics/sql")
+ def sqlDiagnosticsList(
+ @PathParam("appId") appId: String): Class[SQLDiagnosticResource] =
+ classOf[SQLDiagnosticResource]
+
+ @Path("applications/{appId}/{attemptId}/diagnostics/sql")
+ def sqlDiagnosticsList(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] =
+ classOf[SQLDiagnosticResource]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
new file mode 100644
index 000000000000..8a6c81ced741
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.sql
+
+import java.util.Date
+import javax.ws.rs._
+
+import org.apache.spark.sql.diagnostic._
+import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
+
+private[v1] class SQLDiagnosticResource extends BaseAppResource {
+
+ @GET
+ def sqlDiagnosticList(
+ @DefaultValue("0") @QueryParam("offset") offset: Int,
+ @DefaultValue("20") @QueryParam("length") length: Int): Seq[SQLDiagnosticData] = {
+ withUI { ui =>
+ ui.store.diskStore.map { kvStore =>
+ val store = new DiagnosticStore(kvStore)
+ store.diagnosticsList(offset, length)
+ // Do not display the plan changes in the list
+ .map(d => prepareSqlDiagnosticData(d, Seq.empty))
+ }.getOrElse(Seq.empty)
+ }
+ }
+
+ @GET
+ @Path("{executionId:\\d+}")
+ def sqlDiagnostic(
+ @PathParam("executionId") execId: Long): SQLDiagnosticData = {
+ withUI { ui =>
+ ui.store.diskStore.flatMap { kvStore =>
+ val store = new DiagnosticStore(kvStore)
+ val updates = store.adaptiveExecutionUpdates(execId)
+ store.diagnostic(execId)
+ .map(d => prepareSqlDiagnosticData(d, updates))
+ }.getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
+ }
+ }
+
+ private def prepareSqlDiagnosticData(
+ diagnostic: ExecutionDiagnosticData,
+ updates: Seq[AdaptiveExecutionUpdate]): SQLDiagnosticData = {
+ new SQLDiagnosticData(
+ diagnostic.executionId,
+ diagnostic.physicalPlan,
+ new Date(diagnostic.submissionTime),
+ diagnostic.completionTime.map(t => new Date(t)),
+ diagnostic.errorMessage,
+ updates.map(u => AdaptivePlanChange(new Date(u.updateTime), u.physicalPlan)))
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
index 0ddf66718bce..3cafc10352f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
@@ -40,3 +40,13 @@ case class Node private[spark](
metrics: Seq[Metric])
case class Metric private[spark] (name: String, value: String)
+
+class SQLDiagnosticData private[spark] (
+ val id: Long,
+ val physicalPlan: String,
+ val submissionTime: Date,
+ val completionTime: Option[Date],
+ val errorMessage: Option[String],
+ val planChanges: Seq[AdaptivePlanChange])
+
+case class AdaptivePlanChange(updateTime: Date, physicalPlan: String)