Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,4 @@ private[spark] object Status {
.version("3.0.0")
.booleanConf
.createWithDefault(false)

val DISK_STORE_DIR_FOR_STATUS =
ConfigBuilder("spark.appStatusStore.diskStoreDir")
.doc("Local directory where to store diagnostic information of SQL executions. " +
"This configuration is only for live UI.")
.version("3.4.0")
.stringConf
.createOptional
}
27 changes: 3 additions & 24 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@

package org.apache.spark.status

import java.io.File
import java.nio.file.Files
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
import org.apache.spark.ui.scope._
Expand All @@ -39,7 +34,6 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
*/
private[spark] class AppStatusStore(
val store: KVStore,
val diskStore: Option[KVStore] = None,
val listener: Option[AppStatusListener] = None) {

def applicationInfo(): v1.ApplicationInfo = {
Expand Down Expand Up @@ -765,33 +759,18 @@ private[spark] class AppStatusStore(
}
}

private[spark] object AppStatusStore extends Logging {
private[spark] object AppStatusStore {

val CURRENT_VERSION = 2L

/**
* Create an in-memory store for a live application. also create a disk store if
* the `spark.appStatusStore.diskStore.dir` is set
* Create an in-memory store for a live application.
*/
def createLiveStore(
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
// create a disk-based kv store if the directory is set
val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir =>
val storePath = Files.createDirectories(
new File(storeDir, System.currentTimeMillis().toString).toPath
).toFile
try {
Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), conf))
.map(new ElementTrackingStore(_, conf))
} catch {
case NonFatal(e) =>
logWarning("Failed to create disk-based app status store: ", e)
None
}
}
new AppStatusStore(store, diskStore = diskStore, listener = Some(listener))
new AppStatusStore(store, listener = Some(listener))
}
}
11 changes: 0 additions & 11 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,17 +630,6 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<code>?planDescription=[true (default) | false]</code> enables/disables Physical <code>planDescription</code> on demand for the given query when Physical Plan size is high.
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/diagnostics/sql/[execution-id]</code></td>
<td>Diagnostic for the given query, including:
<br>
1. plan change history of adaptive execution
<br>
2. physical plan description with unlimited fields
<br>
This API requires setting <code>spark.appStatusStore.diskStoreDir</code> for storing the diagnostic information.
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/environment</code></td>
<td>Environment details of the given application.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3373,15 +3373,6 @@ object SQLConf {
.intConf
.createWithDefault(25)

val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC =
buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic")
.doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " +
s"output will be stored for the diagnostics API. The output will be stored in " +
s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS.key}")
.version("3.4.0")
.intConf
.createWithDefault(10000)

val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
.doc("Maximum number of characters to output for a plan string. If the plan is " +
"longer, further output will be truncated. The default setting always generates a full " +
Expand Down Expand Up @@ -4737,8 +4728,6 @@ class SQLConf extends Serializable with Logging {

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

def maxToStringFieldsForDiagnostic: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC)

def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt

def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH)
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,9 @@ class QueryExecution(
append("\n")
}

def explainString(
mode: ExplainMode,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
def explainString(mode: ExplainMode): String = {
val concat = new PlanStringConcat()
explainString(mode, maxFields, concat.append)
explainString(mode, SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object SQLExecution {
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
val event = SparkListenerSQLExecutionStart(
sc.listenerBus.post(SparkListenerSQLExecutionStart(
executionId = executionId,
description = desc,
details = callSite.longForm,
Expand All @@ -105,9 +105,7 @@ object SQLExecution {
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
time = System.currentTimeMillis(),
redactedConfigs)
event.qe = queryExecution
sc.listenerBus.post(event)
redactedConfigs))
body
} catch {
case e: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ case class SparkListenerSQLExecutionStart(
sparkPlanInfo: SparkPlanInfo,
time: Long,
modifiedConfigs: Map[String, String] = Map.empty)
extends SparkListenerEvent {

// The `QueryExecution` instance that represents the SQL execution
@JsonIgnore private[sql] var qe: QueryExecution = null
}
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSQLExecutionEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.diagnostic.DiagnosticListener
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
Expand Down Expand Up @@ -119,12 +118,6 @@ private[sql] class SharedState(
statusStore
}

sparkContext.statusStore.diskStore.foreach { kvStore =>
sparkContext.listenerBus.addToQueue(
new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]),
DiagnosticListener.QUEUE_NAME)
}

/**
* A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
* data to show.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,4 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext {
def sqlList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource]

@Path("applications/{appId}/diagnostics/sql")
def sqlDiagnosticsList(
@PathParam("appId") appId: String): Class[SQLDiagnosticResource] =
classOf[SQLDiagnosticResource]

@Path("applications/{appId}/{attemptId}/diagnostics/sql")
def sqlDiagnosticsList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] =
classOf[SQLDiagnosticResource]
}
Loading