Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.{Locale, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
Expand Down Expand Up @@ -110,7 +110,9 @@ private[spark] class Executor(
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
private val executorSource = new ExecutorSource(threadPool, executorId)
private val schemes = conf.get(EXECUTOR_METRICS_FILESYSTEM_SCHEMES)
.toLowerCase(Locale.ROOT).split(",").map(_.trim).filter(_.nonEmpty)
private val executorSource = new ExecutorSource(threadPool, executorId, schemes)
// Pool used for threads that supervise task killing / cancellation
private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper")
// For tasks which are in the process of being killed, this map holds the most recently created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.spark.metrics.source.Source

private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
class ExecutorSource(
threadPool: ThreadPoolExecutor,
executorId: String,
fileSystemSchemes: Array[String]) extends Source {

private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme))
Expand Down Expand Up @@ -70,7 +73,7 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
})

// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
for (scheme <- fileSystemSchemes) {
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("0")

private[spark] val EXECUTOR_METRICS_FILESYSTEM_SCHEMES =
ConfigBuilder("spark.executor.metrics.fileSystemSchemes")
.doc("The file system schemes to report in executor metrics.")
.version("3.1.0")
.stringConf
.createWithDefaultString("file,hdfs")

private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)
Expand Down
2 changes: 2 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,8 @@ This is the component with the largest amount of instrumented metrics
These metrics are exposed by Spark executors.

- namespace=executor (metrics are of type counter or gauge)
- **notes:**
- `spark.executor.metrics.fileSystemSchemes` (default: `file,hdfs`) determines the exposed file system metrics.
- bytesRead.count
- bytesWritten.count
- cpuTime.count
Expand Down