diff --git a/k8s/interpreter/100-interpreter-spec.yaml b/k8s/interpreter/100-interpreter-spec.yaml index c331a9969aa..76f1deaefcc 100644 --- a/k8s/interpreter/100-interpreter-spec.yaml +++ b/k8s/interpreter/100-interpreter-spec.yaml @@ -54,6 +54,15 @@ spec: - name: {{key}} value: {{value}} {% endfor %} + {% if zeppelin.k8s.interpreter.cores is defined and zeppelin.k8s.interpreter.memory is defined %} + resources: + requests: + memory: "{{zeppelin.k8s.interpreter.memory}}" + cpu: "{{zeppelin.k8s.interpreter.cores}}" +{# limits.memory is not set because of a potential OOM-Killer. https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits #} + limits: + cpu: "{{zeppelin.k8s.interpreter.cores}}" + {% endif %} {% if zeppelin.k8s.interpreter.group.name == "spark" %} volumeMounts: - name: spark-home diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index c167ae75372..864f660bb32 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -44,6 +44,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private String userName; private AtomicBoolean started = new AtomicBoolean(false); + private Random rand = new Random(); + + private static final String SPARK_DRIVER_MEMROY = "spark.driver.memory"; + private static final String SPARK_DRIVER_MEMROY_OVERHEAD = "spark.driver.memoryOverhead"; + private static final String SPARK_DRIVER_CORES = "spark.driver.cores"; + private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN"; public K8sRemoteInterpreterProcess( Kubectl kubectl, @@ -273,7 +279,7 @@ Properties getTemplateBindings() { } // environment variables - envs.put("SERVICE_DOMAIN", envs.getOrDefault("SERVICE_DOMAIN", System.getenv("SERVICE_DOMAIN"))); + envs.put(ENV_SERVICE_DOMAIN, envs.getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN))); envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin")); if (isSpark()) { @@ -294,8 +300,22 @@ Properties getTemplateBindings() { webUrl, webUiPort, getPodName(), - envs.get("SERVICE_DOMAIN") + envs.get(ENV_SERVICE_DOMAIN) )); + // Resources of Interpreter Pod + if (properties.containsKey(SPARK_DRIVER_MEMROY)) { + String memory; + if (properties.containsKey(SPARK_DRIVER_MEMROY_OVERHEAD)) { + memory = K8sUtils.calculateSparkMemory(properties.getProperty(SPARK_DRIVER_MEMROY), + properties.getProperty(SPARK_DRIVER_MEMROY_OVERHEAD)); + } else { + memory = K8sUtils.calculateMemoryWithDefaultOverhead(properties.getProperty(SPARK_DRIVER_MEMROY)); + } + k8sProperties.put("zeppelin.k8s.interpreter.memory", memory); + } + if (properties.containsKey(SPARK_DRIVER_CORES)) { + k8sProperties.put("zeppelin.k8s.interpreter.cores", properties.getProperty(SPARK_DRIVER_CORES)); + } } k8sProperties.put("zeppelin.k8s.envs", envs); @@ -310,7 +330,7 @@ String sparkUiWebUrlFromTemplate(String templateString, int port, String service ImmutableMap binding = ImmutableMap.of( "PORT", port, "SERVICE_NAME", serviceName, - "SERVICE_DOMAIN", serviceDomain + ENV_SERVICE_DOMAIN, serviceDomain ); ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); @@ -339,8 +359,8 @@ String buildSparkSubmitOptions() { options.append(" --master k8s://https://kubernetes.default.svc"); options.append(" --deploy-mode client"); - if (properties.containsKey("spark.driver.memory")) { - options.append(" --driver-memory " + properties.get("spark.driver.memory")); + if (properties.containsKey(SPARK_DRIVER_MEMROY)) { + options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMROY)); } if (userName != null) { options.append(" --proxy-user " + userName); @@ -398,9 +418,8 @@ private String getRandomString(int length) { char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); StringBuilder sb = new StringBuilder(); - Random random = new Random(); for (int i = 0; i < length; i++) { - char c = chars[random.nextInt(chars.length)]; + char c = chars[rand.nextInt(chars.length)]; sb.append(c); } return sb.toString(); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java new file mode 100644 index 00000000000..7a629066b07 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java @@ -0,0 +1,59 @@ +package org.apache.zeppelin.interpreter.launcher; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; + +public class K8sUtils { + + private static final long K = 1024; + private static final long M = K * K; + private static final long G = M * K; + private static final long T = G * K; + private static final long MINIMUM_OVERHEAD = 384; + + private K8sUtils() { + // do nothing + } + + public static String calculateMemoryWithDefaultOverhead(String memory) { + long memoryMB = convertToBytes(memory) / M; + long memoryOverheadMB = Math.max((long) (memoryMB * 0.1f), MINIMUM_OVERHEAD); + return (memoryMB + memoryOverheadMB) + "Mi"; + } + + public static String calculateSparkMemory(String memory, String memoryOverhead) { + long memoryMB = convertToBytes(memory) / M; + long memoryOverheadMB = convertToBytes(memoryOverhead) / M; + return (memoryMB + memoryOverheadMB) + "Mi"; + } + + private static long convertToBytes(String memory) { + String lower = memory.toLowerCase().trim(); + Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower); + long value; + String suffix; + if (m.matches()) { + value = Long.parseLong(m.group(1)); + suffix = m.group(2); + } else { + throw new NumberFormatException("Failed to parse string: " + memory); + } + + long memoryAmountBytes = value; + if (StringUtils.containsIgnoreCase(suffix, "k")) { + memoryAmountBytes = value * K; + } else if (StringUtils.containsIgnoreCase(suffix, "m")) { + memoryAmountBytes = value * M; + } else if (StringUtils.containsIgnoreCase(suffix, "g")) { + memoryAmountBytes = value * G; + } else if (StringUtils.containsIgnoreCase(suffix, "t")) { + memoryAmountBytes = value * T; + } + if (0 > memoryAmountBytes) { + throw new NumberFormatException("Conversion of " + memory + " exceeds Long.MAX_VALUE"); + } + return memoryAmountBytes; + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 52c562186f3..7e1e6685ff0 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -141,8 +141,8 @@ public void testGetTemplateBindings() throws IOException { assertEquals("V1", envs.get("MY_ENV1")); envs = (HashMap) p.get("zeppelin.k8s.envs"); - assertEquals(true, envs.containsKey("SERVICE_DOMAIN")); - assertEquals(true, envs.containsKey("ZEPPELIN_HOME")); + assertTrue(envs.containsKey("SERVICE_DOMAIN")); + assertTrue(envs.containsKey("ZEPPELIN_HOME")); } @Test @@ -337,4 +337,78 @@ public void testSparkUiWebUrlTemplate() { "zeppelin-server", "my.domain.com")); } + + @Test + public void testSparkPodResources() { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + properties.put("spark.driver.memory", "1g"); + properties.put("spark.driver.cores", "1"); + HashMap envs = new HashMap(); + envs.put("SERVICE_DOMAIN", "mydomain"); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "spark", + "myspark", + properties, + envs, + "zeppelin.server.service", + "12320", + false, + "spark-container:1.0", + 10, + false); + + // when + Properties p = intp.getTemplateBindings(); + + // then + assertEquals("1", p.get("zeppelin.k8s.interpreter.cores")); + assertEquals("1408Mi", p.get("zeppelin.k8s.interpreter.memory")); + } + + @Test + public void testSparkPodResourcesMemoryOverhead() { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + properties.put("spark.driver.memory", "1g"); + properties.put("spark.driver.memoryOverhead", "256m"); + properties.put("spark.driver.cores", "5"); + HashMap envs = new HashMap(); + envs.put("SERVICE_DOMAIN", "mydomain"); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "spark", + "myspark", + properties, + envs, + "zeppelin.server.service", + "12320", + false, + "spark-container:1.0", + 10, + false); + + // when + Properties p = intp.getTemplateBindings(); + + // then + assertEquals("5", p.get("zeppelin.k8s.interpreter.cores")); + assertEquals("1280Mi", p.get("zeppelin.k8s.interpreter.memory")); + } + } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sUtilsTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sUtilsTest.java new file mode 100644 index 00000000000..cab56ac20b0 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sUtilsTest.java @@ -0,0 +1,30 @@ +package org.apache.zeppelin.interpreter.launcher; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class K8sUtilsTest { + + @Test + public void testConvert() { + assertEquals("484Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100m")); + assertEquals("1408Mi", K8sUtils.calculateMemoryWithDefaultOverhead("1Gb")); + assertEquals("4505Mi", K8sUtils.calculateMemoryWithDefaultOverhead("4Gb")); + assertEquals("6758Mi", K8sUtils.calculateMemoryWithDefaultOverhead("6Gb")); + assertEquals("9011Mi", K8sUtils.calculateMemoryWithDefaultOverhead("8Gb")); + // some extrem values + assertEquals("112640Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100Gb")); + assertEquals("115343360Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100Tb")); + } + + @Test(expected = NumberFormatException.class) + public void testExceptionMaxLong() { + K8sUtils.calculateMemoryWithDefaultOverhead("10000000Tb"); + } + + @Test(expected = NumberFormatException.class) + public void testExceptionNoValidNumber() { + K8sUtils.calculateMemoryWithDefaultOverhead("NoValidNumber10000000Tb"); + } +}