From dfbce912c7371afae5e8f87bf18b5a3d7dbfca52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Mon, 13 Jul 2020 20:21:54 +0200 Subject: [PATCH 1/5] Initial upload --- .../spark/deploy/SparkSubmitArguments.scala | 2 +- docs/configuration.md | 6 ++++-- docs/monitoring.md | 6 +++++- docs/spark-standalone.md | 5 ++++- .../spark/launcher/SparkClassCommandBuilder.java | 3 +++ .../launcher/SparkSubmitCommandBuilder.java | 3 +++ .../launcher/SparkSubmitCommandBuilderSuite.java | 16 ++++++++++++++++ .../src/main/dockerfiles/spark/entrypoint.sh | 9 +++++++-- 8 files changed, 43 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 3090a3b10a97c..a3765ac2215b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -249,7 +249,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S error("Driver memory must be a positive number") } if (executorMemory != null - && Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) { + && Try(JavaUtils.byteStringAsMb(executorMemory)).getOrElse(-1L) <= 0) { error("Executor memory must be a positive number") } if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) { diff --git a/docs/configuration.md b/docs/configuration.md index 706c2552b1d17..67aa804d42dfc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -172,7 +172,8 @@ of the most common options to set are: Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") - (e.g. 512m, 2g). + (e.g. 512m, 2g) using "m" as default suffix unit. +d
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. @@ -249,7 +250,8 @@ of the most common options to set are: 1g Amount of memory to use per executor process, in the same format as JVM memory strings with - a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). + a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g) using + "m" as default suffix unit. 0.7.0 diff --git a/docs/monitoring.md b/docs/monitoring.md index 32959b77c4773..67c9dbf2c814c 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -70,7 +70,11 @@ The history server can be configured as follows: Environment VariableMeaning SPARK_DAEMON_MEMORY - Memory to allocate to the history server (default: 1g). + + Memory to allocate to the history server (default: 1g). This can be configured with the same + format as JVM memory strings using a size unit suffix ("k", "m", "g" or "t") using "m" as + default suffix unit. + SPARK_DAEMON_JAVA_OPTS diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index f3c479ba26547..a78a4e0732953 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -164,7 +164,10 @@ You can optionally configure the cluster further by setting environment variable SPARK_DAEMON_MEMORY - Memory to allocate to the Spark master and worker daemons themselves (default: 1g). + + Memory to allocate to the Spark master and worker daemons themselves (default: 1g). This can be configured with the same + format as JVM memory strings using a size unit suffix ("k", "m", "g" or "t") using "m" as default suffix unit. + SPARK_DAEMON_JAVA_OPTS diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index fd056bb90e0c4..e5b9db0d70867 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -108,6 +108,9 @@ public List buildCommand(Map env) } String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); + if (mem.chars().allMatch(Character::isDigit)) { + mem = mem + "m"; + } cmd.add("-Xmx" + mem); cmd.add(className); cmd.addAll(classArgs); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 383c3f60a595b..c34e4b73d5c53 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -285,6 +285,9 @@ private List buildSparkSubmitCommand(Map env) isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null; String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); + if (memory.chars().allMatch(Character::isDigit)) { + memory = memory + "m"; + } cmd.add("-Xmx" + memory); addOptionString(cmd, driverDefaultJavaOptions); addOptionString(cmd, driverExtraJavaOptions); diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index 752e8d4c23f8b..a3254026a9241 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -115,6 +115,22 @@ public void testCliParser() throws Exception { Collections.indexOfSubList(cmd, Arrays.asList(parser.CONF, "spark.randomOption=foo")) > 0); } + @Test + public void testParserWithDefaultUnit() throws Exception { + List sparkSubmitArgs = Arrays.asList( + parser.MASTER, + "local", + parser.DRIVER_MEMORY, + "4200", + parser.DRIVER_CLASS_PATH, + "/driverCp", + SparkLauncher.NO_RESOURCE); + Map env = new HashMap<>(); + List cmd = buildCommand(sparkSubmitArgs, env); + + assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx4200m")); + } + @Test public void testShellCliParser() throws Exception { List sparkSubmitArgs = Arrays.asList( diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 813a70c6e7ec3..fdcfe6767074b 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -78,11 +78,16 @@ case "$1" in ;; executor) shift 1 + MEMORY_WITH_UNIT=$SPARK_EXECUTOR_MEMORY + if [[ $MEMORY_WITH_UNIT =~ ^[0-9]+$ ]] + then + MEMORY_WITH_UNIT="${MEMORY_WITH_UNIT}m" + fi CMD=( ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" - -Xms$SPARK_EXECUTOR_MEMORY - -Xmx$SPARK_EXECUTOR_MEMORY + -Xms$MEMORY_WITH_UNIT + -Xmx$MEMORY_WITH_UNIT -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL From 2ddbe0c9bf3fd3b0ea2933ddddf83324fc7566b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 14 Jul 2020 11:33:21 +0200 Subject: [PATCH 2/5] Extract common code into an util method --- docs/configuration.md | 5 ++--- docs/monitoring.md | 6 +++--- docs/spark-standalone.md | 4 ++-- .../apache/spark/launcher/CommandBuilderUtils.java | 11 +++++++++++ .../spark/launcher/SparkClassCommandBuilder.java | 5 +---- .../spark/launcher/SparkSubmitCommandBuilder.java | 5 +---- .../launcher/SparkSubmitCommandBuilderSuite.java | 2 +- 7 files changed, 21 insertions(+), 17 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 67aa804d42dfc..a7b7f417dc16c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -172,8 +172,7 @@ of the most common options to set are: Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") - (e.g. 512m, 2g) using "m" as default suffix unit. -d + (e.g. 512m, 2g) using "m" as the default unit.
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. @@ -251,7 +250,7 @@ d Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g) using - "m" as default suffix unit. + "m" as the default unit. 0.7.0 diff --git a/docs/monitoring.md b/docs/monitoring.md index 67c9dbf2c814c..45edf922af521 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -71,9 +71,9 @@ The history server can be configured as follows: SPARK_DAEMON_MEMORY - Memory to allocate to the history server (default: 1g). This can be configured with the same - format as JVM memory strings using a size unit suffix ("k", "m", "g" or "t") using "m" as - default suffix unit. + Memory to allocate to the history server (default: 1g). This can be configured in the same + format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") using "m" as + the default unit. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index a78a4e0732953..5d0039a018635 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -165,8 +165,8 @@ You can optionally configure the cluster further by setting environment variable SPARK_DAEMON_MEMORY - Memory to allocate to the Spark master and worker daemons themselves (default: 1g). This can be configured with the same - format as JVM memory strings using a size unit suffix ("k", "m", "g" or "t") using "m" as default suffix unit. + Memory to allocate to the Spark master and worker daemons themselves (default: 1g). This can be configured in the same + format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") using "m" as the default unit. diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java index 172fb8c560876..4a9d982090cc6 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -328,4 +328,15 @@ static String findJarsDir(String sparkHome, String scalaVersion, boolean failIfN return libdir.getAbsolutePath(); } + /** + * Add "m" as the default suffix unit when no explicit unit is given. + */ + static String addDefaultMSuffixIfNeeded(String memoryString) { + if (memoryString.chars().allMatch(Character::isDigit)) { + return memoryString + "m"; + } else { + return memoryString; + } + } + } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index e5b9db0d70867..4ce9071c5eae8 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -108,10 +108,7 @@ public List buildCommand(Map env) } String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); - if (mem.chars().allMatch(Character::isDigit)) { - mem = mem + "m"; - } - cmd.add("-Xmx" + mem); + cmd.add("-Xmx" + addDefaultMSuffixIfNeeded(mem)); cmd.add(className); cmd.addAll(classArgs); return cmd; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index c34e4b73d5c53..b62130afb8edd 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -285,10 +285,7 @@ private List buildSparkSubmitCommand(Map env) isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null; String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); - if (memory.chars().allMatch(Character::isDigit)) { - memory = memory + "m"; - } - cmd.add("-Xmx" + memory); + cmd.add("-Xmx" + addDefaultMSuffixIfNeeded(memory)); addOptionString(cmd, driverDefaultJavaOptions); addOptionString(cmd, driverExtraJavaOptions); mergeEnvPathList(env, getLibPathEnvName(), diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index a3254026a9241..da221b2e0c667 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -128,7 +128,7 @@ public void testParserWithDefaultUnit() throws Exception { Map env = new HashMap<>(); List cmd = buildCommand(sparkSubmitArgs, env); - assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx4200m")); + assertTrue("Driver -Xmx should be configured in MB by default.", cmd.contains("-Xmx4200m")); } @Test From cc495c1c45ac0648156b662fdc308287c79f3fdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Wed, 15 Jul 2020 16:18:25 +0200 Subject: [PATCH 3/5] use byteStringAsMb for driverMemory --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index a3765ac2215b0..cdd59d4108524 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -245,7 +245,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S error("Must specify a primary resource (JAR or Python or R file)") } if (driverMemory != null - && Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) { + && Try(JavaUtils.byteStringAsMb(driverMemory)).getOrElse(-1L) <= 0) { error("Driver memory must be a positive number") } if (executorMemory != null From 76a297d53916b193bebe89b15572d8e3ca9da918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Mon, 3 Aug 2020 15:04:24 +0200 Subject: [PATCH 4/5] add warning message when suffix is missing --- .../java/org/apache/spark/launcher/CommandBuilderUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java index 4a9d982090cc6..fdc93c2d6e601 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -333,6 +333,8 @@ static String findJarsDir(String sparkHome, String scalaVersion, boolean failIfN */ static String addDefaultMSuffixIfNeeded(String memoryString) { if (memoryString.chars().allMatch(Character::isDigit)) { + System.err.println("Memory setting without explicit unit (" + + memoryString + ") is taken to be in MB by default! For details check SPARK-32293."); return memoryString + "m"; } else { return memoryString; From 8dfe64373fdf5020f635c1ccac003c97d9a64df4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Mon, 3 Aug 2020 15:05:41 +0200 Subject: [PATCH 5/5] fix EXECUTOR_MEMORY check in UnifiedMemoryManager --- .../scala/org/apache/spark/memory/UnifiedMemoryManager.scala | 2 +- docs/spark-standalone.md | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 7282a83f0739f..ebd32459e474a 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -222,7 +222,7 @@ object UnifiedMemoryManager { } // SPARK-12759 Check executor memory to fail fast if memory is insufficient if (conf.contains(config.EXECUTOR_MEMORY)) { - val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key) + val executorMemory = conf.getSizeAsMb(config.EXECUTOR_MEMORY.key) if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 5d0039a018635..9fc6377d4f06e 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -144,7 +144,10 @@ You can optionally configure the cluster further by setting environment variable SPARK_WORKER_MEMORY - Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GiB); note that each application's individual memory is configured using its spark.executor.memory property. + + Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GiB); note that each application's individual memory is configured using its spark.executor.memory property. + This can be configured in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") using "m" as the default unit. + SPARK_WORKER_PORT