From f513285d7e7a1818289745498c4d49559ee3cf74 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 20 Feb 2019 19:27:49 +0800 Subject: [PATCH 1/7] Fix incorrect computation of maxNumExecutorFailures for streaming --- .../spark/internal/config/package.scala | 28 +++++++++++++++++++ .../scala/org/apache/spark/util/Utils.scala | 6 ++++ .../spark/deploy/yarn/ApplicationMaster.scala | 4 ++- .../scheduler/ExecutorAllocationManager.scala | 6 ++-- 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 850d6845684b..c7d14fe97cc2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -351,6 +351,34 @@ package object config { ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout") .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) + private[streaming] val STREAMING_DYN_ALLOCATION_ENABLED = + ConfigBuilder("spark.streaming.dynamicAllocation.enabled") + .booleanConf.createWithDefault(false) + + private[streaming] val STREAMING_DYN_ALLOCATION_TESTING = + ConfigBuilder("spark.streaming.dynamicAllocation.testing") + .booleanConf.createWithDefault(false) + + private[streaming] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = + ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors") + .intConf.createWithDefault(0) + + private[streaming] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = + ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors") + .intConf.createWithDefault(Int.MaxValue) + + private[streaming] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL = + ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval") + .timeConf(TimeUnit.SECONDS).createWithDefault(60) + + private[streaming] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO = + ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio") + .doubleConf.createWithDefault(0.9) + + private[streaimg] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO = + ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio") + .doubleConf.createWithDefault(0.3) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2d00453159d6..f80033fd2918 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2490,6 +2490,12 @@ private[spark] object Utils extends Logging { (!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING)) } + def isStreamingDynamicAllocationEnabled(conf: SparkConf): Boolean = { + val streamingDynamicAllocationEnabled = conf.get(STREAMING_DYN_ALLOCATION_ENABLED) + streamingDynamicAllocationEnabled && + (!isLocalMaster(conf) || conf.get(STREAMING_DYN_ALLOCATION_TESTING)) + } + /** * Return the initial number of executors for dynamic allocation. */ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 91e1fbd17ff5..201f7840c1df 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -100,7 +100,9 @@ private[spark] class ApplicationMaster( private val maxNumExecutorFailures = { val effectiveNumExecutors = - if (Utils.isDynamicAllocationEnabled(sparkConf)) { + if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) { + sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS) + } else if (Utils.isDynamicAllocationEnabled(sparkConf)) { sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) } else { sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 8717555dea49..94c1c78ff839 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -206,7 +206,8 @@ private[streaming] object ExecutorAllocationManager extends Logging { val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors" def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false) + val streamingDynamicAllocationEnabled = Utils.isStreamingDynamicAllocationEnabled(conf) + Utils.isStreamingDynamicAllocationEnabled(conf) if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) { throw new IllegalArgumentException( """ @@ -215,8 +216,7 @@ private[streaming] object ExecutorAllocationManager extends Logging { |false to use Dynamic Allocation in streaming. """.stripMargin) } - val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false) - streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing) + streamingDynamicAllocationEnabled } def createIfEnabled( From 1b1e4d098c3e42a93d20a2a17bab50c09810e6f1 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Wed, 20 Feb 2019 21:24:48 +0800 Subject: [PATCH 2/7] Replace hardcoded configs with ConfigEntry --- .../spark/internal/config/package.scala | 2 +- .../scheduler/ExecutorAllocationManager.scala | 49 +++++++------------ 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c7d14fe97cc2..8d433a11b6eb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -361,7 +361,7 @@ package object config { private[streaming] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors") - .intConf.createWithDefault(0) + .intConf.createOptional private[streaming] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 94c1c78ff839..a93cf47bb945 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -57,15 +58,12 @@ private[streaming] class ExecutorAllocationManager( import ExecutorAllocationManager._ - private val scalingIntervalSecs = conf.getTimeAsSeconds( - SCALING_INTERVAL_KEY, - s"${SCALING_INTERVAL_DEFAULT_SECS}s") - private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT) - private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT) - private val minNumExecutors = conf.getInt( - MIN_EXECUTORS_KEY, - math.max(1, receiverTracker.numReceivers)) - private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE) + private val scalingIntervalSecs = conf.get(STREAMING_DYN_ALLOCATION_SCALING_INTERVAL) + private val scalingUpRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO) + private val scalingDownRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO) + private val minNumExecutors = conf.get(STREAMING_DYN_ALLOCATION_MIN_EXECUTORS) + .getOrElse(math.max(1, receiverTracker.numReceivers())) + private val maxNumExecutors = conf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS) private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000, _ => manageAllocation(), "streaming-executor-allocation-manager") @@ -152,32 +150,35 @@ private[streaming] class ExecutorAllocationManager( private def validateSettings(): Unit = { require( scalingIntervalSecs > 0, - s"Config $SCALING_INTERVAL_KEY must be more than 0") + s"Config ${STREAMING_DYN_ALLOCATION_SCALING_INTERVAL.key} must be more than 0") require( scalingUpRatio > 0, - s"Config $SCALING_UP_RATIO_KEY must be more than 0") + s"Config ${STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.key} must be more than 0") require( scalingDownRatio > 0, - s"Config $SCALING_DOWN_RATIO_KEY must be more than 0") + s"Config ${STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.key} must be more than 0") require( minNumExecutors > 0, - s"Config $MIN_EXECUTORS_KEY must be more than 0") + s"Config ${STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key} must be more than 0") require( maxNumExecutors > 0, - s"$MAX_EXECUTORS_KEY must be more than 0") + s"${STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key} must be more than 0") require( scalingUpRatio > scalingDownRatio, - s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY") + s"Config ${STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.key} must be more than config " + + s"${STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.key}") - if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) { + if (conf.contains(STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key) && + conf.contains(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key)) { require( maxNumExecutors >= minNumExecutors, - s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY") + s"Config ${STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key} must be more than config " + + s"${STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key}") } } @@ -190,20 +191,6 @@ private[streaming] class ExecutorAllocationManager( } private[streaming] object ExecutorAllocationManager extends Logging { - val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled" - - val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval" - val SCALING_INTERVAL_DEFAULT_SECS = 60 - - val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio" - val SCALING_UP_RATIO_DEFAULT = 0.9 - - val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio" - val SCALING_DOWN_RATIO_DEFAULT = 0.3 - - val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors" - - val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors" def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { val streamingDynamicAllocationEnabled = Utils.isStreamingDynamicAllocationEnabled(conf) From ddc531271e8779ab15f57f6d5a613e8f0aab0668 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Thu, 21 Feb 2019 11:07:10 +0800 Subject: [PATCH 3/7] Refine as commented --- .../spark/internal/config/package.scala | 31 ++++++++++++++----- .../scheduler/ExecutorAllocationManager.scala | 21 ------------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8d433a11b6eb..34c0567d59c6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -353,31 +353,48 @@ package object config { private[streaming] val STREAMING_DYN_ALLOCATION_ENABLED = ConfigBuilder("spark.streaming.dynamicAllocation.enabled") - .booleanConf.createWithDefault(false) + .booleanConf + .createWithDefault(false) private[streaming] val STREAMING_DYN_ALLOCATION_TESTING = ConfigBuilder("spark.streaming.dynamicAllocation.testing") - .booleanConf.createWithDefault(false) + .booleanConf + .createWithDefault(false) private[streaming] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors") - .intConf.createOptional + .intConf + .checkValue(_ > 0, "The min executor number of streaming dynamic " + + "allocation must be positive.") + .createOptional private[streaming] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors") - .intConf.createWithDefault(Int.MaxValue) + .intConf + .checkValue(_ > 0, "The max executor number of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(Int.MaxValue) private[streaming] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL = ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval") - .timeConf(TimeUnit.SECONDS).createWithDefault(60) + .timeConf(TimeUnit.SECONDS) + .checkValue(_ > 0, "The scaling interval of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(60) private[streaming] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO = ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio") - .doubleConf.createWithDefault(0.9) + .doubleConf + .checkValue(_ > 0, "The scaling up ratio of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(0.9) private[streaimg] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO = ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio") - .doubleConf.createWithDefault(0.3) + .doubleConf + .checkValue(_ > 0, "The scaling down ratio of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(0.3) private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .timeConf(TimeUnit.MILLISECONDS) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index a93cf47bb945..97dc3231fd0a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -148,26 +148,6 @@ private[streaming] class ExecutorAllocationManager( } private def validateSettings(): Unit = { - require( - scalingIntervalSecs > 0, - s"Config ${STREAMING_DYN_ALLOCATION_SCALING_INTERVAL.key} must be more than 0") - - require( - scalingUpRatio > 0, - s"Config ${STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.key} must be more than 0") - - require( - scalingDownRatio > 0, - s"Config ${STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.key} must be more than 0") - - require( - minNumExecutors > 0, - s"Config ${STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key} must be more than 0") - - require( - maxNumExecutors > 0, - s"${STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key} must be more than 0") - require( scalingUpRatio > scalingDownRatio, s"Config ${STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.key} must be more than config " + @@ -194,7 +174,6 @@ private[streaming] object ExecutorAllocationManager extends Logging { def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { val streamingDynamicAllocationEnabled = Utils.isStreamingDynamicAllocationEnabled(conf) - Utils.isStreamingDynamicAllocationEnabled(conf) if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) { throw new IllegalArgumentException( """ From bd21ba303dd7e15243ff2fda7e24fb9d29818c4a Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Tue, 26 Feb 2019 18:54:46 +0800 Subject: [PATCH 4/7] fix streaming is not an enclosing class --- .../org/apache/spark/internal/config/package.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 34c0567d59c6..769b0c6e34a3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -351,45 +351,45 @@ package object config { ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout") .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) - private[streaming] val STREAMING_DYN_ALLOCATION_ENABLED = + private[spark] val STREAMING_DYN_ALLOCATION_ENABLED = ConfigBuilder("spark.streaming.dynamicAllocation.enabled") .booleanConf .createWithDefault(false) - private[streaming] val STREAMING_DYN_ALLOCATION_TESTING = + private[spark] val STREAMING_DYN_ALLOCATION_TESTING = ConfigBuilder("spark.streaming.dynamicAllocation.testing") .booleanConf .createWithDefault(false) - private[streaming] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = + private[spark] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors") .intConf .checkValue(_ > 0, "The min executor number of streaming dynamic " + "allocation must be positive.") .createOptional - private[streaming] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = + private[spark] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors") .intConf .checkValue(_ > 0, "The max executor number of streaming dynamic " + "allocation must be positive.") .createWithDefault(Int.MaxValue) - private[streaming] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL = + private[spark] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL = ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval") .timeConf(TimeUnit.SECONDS) .checkValue(_ > 0, "The scaling interval of streaming dynamic " + "allocation must be positive.") .createWithDefault(60) - private[streaming] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO = + private[spark] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO = ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio") .doubleConf .checkValue(_ > 0, "The scaling up ratio of streaming dynamic " + "allocation must be positive.") .createWithDefault(0.9) - private[streaimg] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO = + private[spark] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO = ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio") .doubleConf .checkValue(_ > 0, "The scaling down ratio of streaming dynamic " + From 882bd352d642cf11e5b29cc61028720c8781b44f Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Tue, 5 Mar 2019 14:59:18 +0800 Subject: [PATCH 5/7] Fix ExecutorAllocationManasterSuite --- .../ExecutorAllocationManagerSuite.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index fcbba006a816..f3a08603abe7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.time.SpanSugar._ import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite} -import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} +import org.apache.spark.internal.config._ import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext} import org.apache.spark.util.{ManualClock, Utils} @@ -58,7 +58,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite reset(allocationClient) when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2")) addBatchProcTime(allocationManager, batchProcTimeMs.toLong) - val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1 + val advancedTime = STREAMING_DYN_ALLOCATION_SCALING_INTERVAL.defaultValue.get * 1000 + 1 val expectedWaitTime = clock.getTimeMillis() + advancedTime clock.advance(advancedTime) // Make sure ExecutorAllocationManager.manageAllocation is called @@ -101,25 +101,29 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite } // Batch proc time slightly more than the scale up ratio, should increase allocation by 1 - addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT + 1) { + addBatchProcTimeAndVerifyAllocation( + batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(Some(3)) verifyKilledExec(None) } // Batch proc time slightly less than the scale up ratio, should not change allocation - addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT - 1) { + addBatchProcTimeAndVerifyAllocation( + batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) verifyKilledExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation - addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT + 1) { + addBatchProcTimeAndVerifyAllocation( + batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(None) verifyKilledExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation - addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT - 1) { + addBatchProcTimeAndVerifyAllocation( + batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) verifyKilledExec(Some("2")) } From 2f46380d62ceac964e513848b273bee56915f088 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 13 Mar 2019 10:37:43 +0800 Subject: [PATCH 6/7] Moving streaming configs to Streaming.scala --- .../spark/internal/config/package.scala | 45 ------------------- .../scala/org/apache/spark/util/Utils.scala | 1 + .../spark/deploy/yarn/ApplicationMaster.scala | 1 + .../scheduler/ExecutorAllocationManager.scala | 4 +- .../ExecutorAllocationManagerSuite.scala | 5 +-- 5 files changed, 5 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 769b0c6e34a3..850d6845684b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -351,51 +351,6 @@ package object config { ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout") .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) - private[spark] val STREAMING_DYN_ALLOCATION_ENABLED = - ConfigBuilder("spark.streaming.dynamicAllocation.enabled") - .booleanConf - .createWithDefault(false) - - private[spark] val STREAMING_DYN_ALLOCATION_TESTING = - ConfigBuilder("spark.streaming.dynamicAllocation.testing") - .booleanConf - .createWithDefault(false) - - private[spark] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = - ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors") - .intConf - .checkValue(_ > 0, "The min executor number of streaming dynamic " + - "allocation must be positive.") - .createOptional - - private[spark] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = - ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors") - .intConf - .checkValue(_ > 0, "The max executor number of streaming dynamic " + - "allocation must be positive.") - .createWithDefault(Int.MaxValue) - - private[spark] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL = - ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval") - .timeConf(TimeUnit.SECONDS) - .checkValue(_ > 0, "The scaling interval of streaming dynamic " + - "allocation must be positive.") - .createWithDefault(60) - - private[spark] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO = - ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio") - .doubleConf - .checkValue(_ > 0, "The scaling up ratio of streaming dynamic " + - "allocation must be positive.") - .createWithDefault(0.9) - - private[spark] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO = - ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio") - .doubleConf - .checkValue(_ > 0, "The scaling down ratio of streaming dynamic " + - "allocation must be positive.") - .createWithDefault(0.3) - private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f80033fd2918..03986254f0c3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,6 +60,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Streaming._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 201f7840c1df..9ed3b78986c0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -45,6 +45,7 @@ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.rpc._ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 97dc3231fd0a..e85a3b9009c3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Streaming._ import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -56,8 +56,6 @@ private[streaming] class ExecutorAllocationManager( batchDurationMs: Long, clock: Clock) extends StreamingListener with Logging { - import ExecutorAllocationManager._ - private val scalingIntervalSecs = conf.get(STREAMING_DYN_ALLOCATION_SCALING_INTERVAL) private val scalingUpRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO) private val scalingDownRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index f3a08603abe7..22d027d9b818 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -25,7 +25,8 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.time.SpanSugar._ import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite} -import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} +import org.apache.spark.internal.config.Streaming._ import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext} import org.apache.spark.util.{ManualClock, Utils} @@ -33,8 +34,6 @@ import org.apache.spark.util.{ManualClock, Utils} class ExecutorAllocationManagerSuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterAll with MockitoSugar with PrivateMethodTester { - import ExecutorAllocationManager._ - private val batchDurationMillis = 1000L private var allocationClient: ExecutorAllocationClient = null private var clock: StreamManualClock = null From a46d4d382b3e4d385b3f86c78800b07d04164073 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Sat, 16 Mar 2019 17:46:59 +0800 Subject: [PATCH 7/7] Add missing Streaming.scala --- .../spark/internal/config/Streaming.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/internal/config/Streaming.scala diff --git a/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala b/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala new file mode 100644 index 000000000000..6e58c090e812 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import java.util.concurrent.TimeUnit + +private[spark] object Streaming { + + private[spark] val STREAMING_DYN_ALLOCATION_ENABLED = + ConfigBuilder("spark.streaming.dynamicAllocation.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val STREAMING_DYN_ALLOCATION_TESTING = + ConfigBuilder("spark.streaming.dynamicAllocation.testing") + .booleanConf + .createWithDefault(false) + + private[spark] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS = + ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors") + .intConf + .checkValue(_ > 0, "The min executor number of streaming dynamic " + + "allocation must be positive.") + .createOptional + + private[spark] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS = + ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors") + .intConf + .checkValue(_ > 0, "The max executor number of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(Int.MaxValue) + + private[spark] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL = + ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval") + .timeConf(TimeUnit.SECONDS) + .checkValue(_ > 0, "The scaling interval of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(60) + + private[spark] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO = + ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio") + .doubleConf + .checkValue(_ > 0, "The scaling up ratio of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(0.9) + + private[spark] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO = + ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio") + .doubleConf + .checkValue(_ > 0, "The scaling down ratio of streaming dynamic " + + "allocation must be positive.") + .createWithDefault(0.3) +}