diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index fa4904b93021..0489619da40f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -173,7 +173,8 @@ private[spark] object Config extends Logging { object ExecutorRollPolicy extends Enumeration { val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, - PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, OUTLIER, OUTLIER_NO_FALLBACK = Value + PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, DISK_USED, + OUTLIER, OUTLIER_NO_FALLBACK = Value } val EXECUTOR_ROLL_POLICY = @@ -192,6 +193,7 @@ private[spark] object Config extends Logging { "PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " + "memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " + "off-heap memory. " + + "DISK_USED policy chooses an executor with the biggest used disk size. " + "OUTLIER policy chooses an executor with outstanding statistics which is bigger than" + "at least two standard deviation from the mean in average task time, " + "total task time, total task GC time, and the number of failed tasks if exists. " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala index b53c9b69d15d..d0d30980fc1c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala @@ -128,6 +128,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY => listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse + case ExecutorRollPolicy.DISK_USED => + listWithoutDriver.sortBy(_.diskUsed).reverse case ExecutorRollPolicy.OUTLIER => // If there is no outlier we fallback to TOTAL_DURATION policy. outliersFromMultipleDimensions(listWithoutDriver) ++ @@ -151,7 +153,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { outliers(listWithoutDriver, e => e.totalGCTime) ++ outliers(listWithoutDriver, e => e.failedTasks) ++ outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++ - outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory")) + outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory")) ++ + outliers(listWithoutDriver, e => e.diskUsed) /** * Return executors whose metrics is outstanding, '(value - mean) > 2-sigma'. This is diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala index e019834677da..5600efa0a69e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala @@ -141,10 +141,19 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1201L))), Map(), Map(), 1, false, Set()) + val execWithBiggestDiskUsed = new ExecutorSummary("13", "host:port", true, 1, + 10, 15, 1, 1, 1, + 4, 0, 2, 280, + 30, 100, 100, + 10, false, 20, new Date(1639300001000L), + Option.empty, Option.empty, Map(), Option.empty, Set(), + metrics, Map(), Map(), 1, false, Set()) + val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime, execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks, execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID, - execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory) + execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory, + execWithBiggestDiskUsed) override def beforeEach(): Unit = { super.beforeEach() @@ -214,6 +223,10 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { _choose(list, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)).contains("12")) } + test("Policy: DISK_USED") { + assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.DISK_USED)).contains("13")) + } + test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") { assert( plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)) == @@ -362,4 +375,17 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) == plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK))) } + + test("Policy: OUTLIER_NO_FALLBACK - Detect an used disk outlier") { + val outlier = new ExecutorSummary("9999", "host:port", true, 1, + 0, 100000, 1, 0, 0, + 3, 0, 1, 100, + 0, 0, 0, + 0, false, 0, new Date(1639300001000L), + Option.empty, Option.empty, Map(), Option.empty, Set(), + metrics, Map(), Map(), 1, false, Set()) + assert( + plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.DISK_USED)) == + plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK))) + } }