diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala b/core/src/main/scala/org/apache/spark/internal/config/Status.scala index 64462fdee540..669fa07053ca 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala @@ -70,12 +70,4 @@ private[spark] object Status { .version("3.0.0") .booleanConf .createWithDefault(false) - - val DISK_STORE_DIR_FOR_STATUS = - ConfigBuilder("spark.appStatusStore.diskStoreDir") - .doc("Local directory where to store diagnostic information of SQL executions. " + - "This configuration is only for live UI.") - .version("3.4.0") - .stringConf - .createOptional } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 4c0ac5e3192e..cec99254e966 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -17,17 +17,12 @@ package org.apache.spark.status -import java.io.File -import java.nio.file.Files import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap -import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID import org.apache.spark.ui.scope._ @@ -39,7 +34,6 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} */ private[spark] class AppStatusStore( val store: KVStore, - val diskStore: Option[KVStore] = None, val listener: Option[AppStatusListener] = None) { def applicationInfo(): v1.ApplicationInfo = { @@ -765,33 +759,18 @@ private[spark] class AppStatusStore( } } -private[spark] object AppStatusStore extends Logging { +private[spark] object AppStatusStore { val CURRENT_VERSION = 2L /** - * Create an in-memory store for a live application. also create a disk store if - * the `spark.appStatusStore.diskStore.dir` is set + * Create an in-memory store for a live application. */ def createLiveStore( conf: SparkConf, appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { val store = new ElementTrackingStore(new InMemoryStore(), conf) val listener = new AppStatusListener(store, conf, true, appStatusSource) - // create a disk-based kv store if the directory is set - val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir => - val storePath = Files.createDirectories( - new File(storeDir, System.currentTimeMillis().toString).toPath - ).toFile - try { - Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), conf)) - .map(new ElementTrackingStore(_, conf)) - } catch { - case NonFatal(e) => - logWarning("Failed to create disk-based app status store: ", e) - None - } - } - new AppStatusStore(store, diskStore = diskStore, listener = Some(listener)) + new AppStatusStore(store, listener = Some(listener)) } } diff --git a/docs/monitoring.md b/docs/monitoring.md index 89cce0909e9e..14048afbe0f4 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -630,17 +630,6 @@ can be identified by their `[attempt-id]`. In the API listed below, when running ?planDescription=[true (default) | false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high. - - /applications/[app-id]/diagnostics/sql/[execution-id] - Diagnostic for the given query, including: -
- 1. plan change history of adaptive execution -
- 2. physical plan description with unlimited fields -
- This API requires setting spark.appStatusStore.diskStoreDir for storing the diagnostic information. - - /applications/[app-id]/environment Environment details of the given application. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0f3dc3cf44c4..2b52b6bfbf48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3373,15 +3373,6 @@ object SQLConf { .intConf .createWithDefault(25) - val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC = - buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic") - .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " + - s"output will be stored for the diagnostics API. The output will be stored in " + - s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS.key}") - .version("3.4.0") - .intConf - .createWithDefault(10000) - val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, further output will be truncated. The default setting always generates a full " + @@ -4737,8 +4728,6 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) - def maxToStringFieldsForDiagnostic: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC) - def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala deleted file mode 100644 index 6899790603b8..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.diagnostic - -import org.apache.spark.SparkConf -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.sql.execution.ExplainMode -import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} -import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS -import org.apache.spark.status.{ElementTrackingStore, KVUtils} - -/** - * A Spark listener that writes diagnostic information to a data store. The information can be - * accessed by the public REST API. - * - * @param kvStore used to store the diagnostic information - */ -private[spark] class DiagnosticListener( - conf: SparkConf, - kvStore: ElementTrackingStore) extends SparkListener { - - kvStore.addTrigger( - classOf[ExecutionDiagnosticData], - conf.get(UI_RETAINED_EXECUTIONS)) { count => - cleanupExecutions(count) - } - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case e: SparkListenerSQLExecutionStart => onExecutionStart(e) - case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) - case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e) - case _ => // Ignore - } - - private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { - val data = new AdaptiveExecutionUpdate( - event.executionId, - System.currentTimeMillis(), - event.physicalPlanDescription - ) - kvStore.write(data) - } - - private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { - val sqlConf = event.qe.sparkSession.sessionState.conf - val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode) - val physicalPlan = event.qe.explainString( - planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic) - val data = new ExecutionDiagnosticData( - event.executionId, - physicalPlan, - event.time, - None, - None - ) - // Check triggers since it's adding new netries - kvStore.write(data, checkTriggers = true) - } - - private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { - try { - val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId) - val sqlConf = event.qe.sparkSession.sessionState.conf - val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode) - val physicalPlan = event.qe.explainString( - planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic) - val data = new ExecutionDiagnosticData( - event.executionId, - physicalPlan, - existing.submissionTime, - Some(event.time), - event.executionFailure.map( - e => s"${e.getClass.getCanonicalName}: ${e.getMessage}").orElse(Some("")) - ) - kvStore.write(data) - } catch { - case _: NoSuchElementException => - // this is possibly caused by the query failed before execution. - } - } - - private def cleanupExecutions(count: Long): Unit = { - val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) - if (countToDelete <= 0) { - return - } - val view = kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L) - val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined) - toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData], e.executionId)) - kvStore.removeAllByIndexValues( - classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId)) - } -} - -private[spark] object DiagnosticListener { - val QUEUE_NAME = "diagnostics" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala deleted file mode 100644 index 53ff787fe672..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.diagnostic - -import com.fasterxml.jackson.annotation.JsonIgnore - -import org.apache.spark.status.KVUtils -import org.apache.spark.status.KVUtils.KVIndexParam -import org.apache.spark.util.kvstore.{KVIndex, KVStore} - -/** - * Provides a view of a KVStore with methods that make it easy to query diagnostic-specific - * information. There's no state kept in this class, so it's ok to have multiple instances - * of it in an application. - */ -private[spark] class DiagnosticStore(store: KVStore) { - - def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = { - KVUtils.viewToSeq(store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length)) - } - - def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = { - try { - Some(store.read(classOf[ExecutionDiagnosticData], executionId)) - } catch { - case _: NoSuchElementException => None - } - } - - def adaptiveExecutionUpdates(executionId: Long): Seq[AdaptiveExecutionUpdate] = { - KVUtils.viewToSeq( - store.view(classOf[AdaptiveExecutionUpdate]) - .index("updateTime") - .parent(executionId)) - } -} - -/* Represents the diagnostic data of a SQL execution */ -private[spark] class ExecutionDiagnosticData( - @KVIndexParam val executionId: Long, - val physicalPlan: String, - val submissionTime: Long, - val completionTime: Option[Long], - val errorMessage: Option[String]) - -/* Represents the plan change of an adaptive execution */ -private[spark] class AdaptiveExecutionUpdate( - @KVIndexParam("id") - val executionId: Long, - @KVIndexParam(value = "updateTime", parent = "id") - val updateTime: Long, - val physicalPlan: String) { - - @JsonIgnore @KVIndex - private def naturalIndex: Array[Long] = Array(executionId, updateTime) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 798a219d2435..891c5fa5bc13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -213,11 +213,9 @@ class QueryExecution( append("\n") } - def explainString( - mode: ExplainMode, - maxFields: Int = SQLConf.get.maxToStringFields): String = { + def explainString(mode: ExplainMode): String = { val concat = new PlanStringConcat() - explainString(mode, maxFields, concat.append) + explainString(mode, SQLConf.get.maxToStringFields, concat.append) withRedaction { concat.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 1c36131ddb40..4db7c85b4ff3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -96,7 +96,7 @@ object SQLExecution { var ex: Option[Throwable] = None val startTime = System.nanoTime() try { - val event = SparkListenerSQLExecutionStart( + sc.listenerBus.post(SparkListenerSQLExecutionStart( executionId = executionId, description = desc, details = callSite.longForm, @@ -105,9 +105,7 @@ object SQLExecution { // will be caught and reported in the `SparkListenerSQLExecutionEnd` sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), time = System.currentTimeMillis(), - redactedConfigs) - event.qe = queryExecution - sc.listenerBus.post(event) + redactedConfigs)) body } catch { case e: Throwable => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 86bb40fde2f8..f007ab2e8b52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -49,11 +49,7 @@ case class SparkListenerSQLExecutionStart( sparkPlanInfo: SparkPlanInfo, time: Long, modifiedConfigs: Map[String, String] = Map.empty) - extends SparkListenerEvent { - - // The `QueryExecution` instance that represents the SQL execution - @JsonIgnore private[sql] var qe: QueryExecution = null -} + extends SparkListenerEvent @DeveloperApi case class SparkListenerSQLExecutionEnd( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 92c3ec888d6c..164710cdd883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.diagnostic.DiagnosticListener import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution @@ -119,12 +118,6 @@ private[sql] class SharedState( statusStore } - sparkContext.statusStore.diskStore.foreach { kvStore => - sparkContext.listenerBus.addToQueue( - new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]), - DiagnosticListener.QUEUE_NAME) - } - /** * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui * data to show. diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index 6c727c4369d8..747c05b9b062 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -31,15 +31,4 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext { def sqlList( @PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource] - - @Path("applications/{appId}/diagnostics/sql") - def sqlDiagnosticsList( - @PathParam("appId") appId: String): Class[SQLDiagnosticResource] = - classOf[SQLDiagnosticResource] - - @Path("applications/{appId}/{attemptId}/diagnostics/sql") - def sqlDiagnosticsList( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] = - classOf[SQLDiagnosticResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala deleted file mode 100644 index 8a6c81ced741..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.sql - -import java.util.Date -import javax.ws.rs._ - -import org.apache.spark.sql.diagnostic._ -import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} - -private[v1] class SQLDiagnosticResource extends BaseAppResource { - - @GET - def sqlDiagnosticList( - @DefaultValue("0") @QueryParam("offset") offset: Int, - @DefaultValue("20") @QueryParam("length") length: Int): Seq[SQLDiagnosticData] = { - withUI { ui => - ui.store.diskStore.map { kvStore => - val store = new DiagnosticStore(kvStore) - store.diagnosticsList(offset, length) - // Do not display the plan changes in the list - .map(d => prepareSqlDiagnosticData(d, Seq.empty)) - }.getOrElse(Seq.empty) - } - } - - @GET - @Path("{executionId:\\d+}") - def sqlDiagnostic( - @PathParam("executionId") execId: Long): SQLDiagnosticData = { - withUI { ui => - ui.store.diskStore.flatMap { kvStore => - val store = new DiagnosticStore(kvStore) - val updates = store.adaptiveExecutionUpdates(execId) - store.diagnostic(execId) - .map(d => prepareSqlDiagnosticData(d, updates)) - }.getOrElse(throw new NotFoundException("unknown query execution id: " + execId)) - } - } - - private def prepareSqlDiagnosticData( - diagnostic: ExecutionDiagnosticData, - updates: Seq[AdaptiveExecutionUpdate]): SQLDiagnosticData = { - new SQLDiagnosticData( - diagnostic.executionId, - diagnostic.physicalPlan, - new Date(diagnostic.submissionTime), - diagnostic.completionTime.map(t => new Date(t)), - diagnostic.errorMessage, - updates.map(u => AdaptivePlanChange(new Date(u.updateTime), u.physicalPlan))) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 3cafc10352f0..0ddf66718bce 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -40,13 +40,3 @@ case class Node private[spark]( metrics: Seq[Metric]) case class Metric private[spark] (name: String, value: String) - -class SQLDiagnosticData private[spark] ( - val id: Long, - val physicalPlan: String, - val submissionTime: Date, - val completionTime: Option[Date], - val errorMessage: Option[String], - val planChanges: Seq[AdaptivePlanChange]) - -case class AdaptivePlanChange(updateTime: Date, physicalPlan: String)