Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions k8s/interpreter/100-interpreter-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ spec:
- name: {{key}}
value: {{value}}
{% endfor %}
{% if zeppelin.k8s.interpreter.cores is defined and zeppelin.k8s.interpreter.memory is defined %}
resources:
requests:
memory: "{{zeppelin.k8s.interpreter.memory}}"
cpu: "{{zeppelin.k8s.interpreter.cores}}"
{# limits.memory is not set because of a potential OOM-Killer. https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits #}
limits:
cpu: "{{zeppelin.k8s.interpreter.cores}}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - How about adding some short comments here about why limits.memory is not configured? maybe a link to this pull request description? So later, other people can see this as a intentional when read the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

{% endif %}
{% if zeppelin.k8s.interpreter.group.name == "spark" %}
volumeMounts:
- name: spark-home
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private String userName;

private AtomicBoolean started = new AtomicBoolean(false);
private Random rand = new Random();

private static final String SPARK_DRIVER_MEMROY = "spark.driver.memory";
private static final String SPARK_DRIVER_MEMROY_OVERHEAD = "spark.driver.memoryOverhead";
private static final String SPARK_DRIVER_CORES = "spark.driver.cores";
private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN";

public K8sRemoteInterpreterProcess(
Kubectl kubectl,
Expand Down Expand Up @@ -273,7 +279,7 @@ Properties getTemplateBindings() {
}

// environment variables
envs.put("SERVICE_DOMAIN", envs.getOrDefault("SERVICE_DOMAIN", System.getenv("SERVICE_DOMAIN")));
envs.put(ENV_SERVICE_DOMAIN, envs.getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN)));
envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin"));

if (isSpark()) {
Expand All @@ -294,8 +300,22 @@ Properties getTemplateBindings() {
webUrl,
webUiPort,
getPodName(),
envs.get("SERVICE_DOMAIN")
envs.get(ENV_SERVICE_DOMAIN)
));
// Resources of Interpreter Pod
if (properties.containsKey(SPARK_DRIVER_MEMROY)) {
String memory;
if (properties.containsKey(SPARK_DRIVER_MEMROY_OVERHEAD)) {
memory = K8sUtils.calculateSparkMemory(properties.getProperty(SPARK_DRIVER_MEMROY),
properties.getProperty(SPARK_DRIVER_MEMROY_OVERHEAD));
} else {
memory = K8sUtils.calculateMemoryWithDefaultOverhead(properties.getProperty(SPARK_DRIVER_MEMROY));
}
k8sProperties.put("zeppelin.k8s.interpreter.memory", memory);
}
if (properties.containsKey(SPARK_DRIVER_CORES)) {
k8sProperties.put("zeppelin.k8s.interpreter.cores", properties.getProperty(SPARK_DRIVER_CORES));
}
}

k8sProperties.put("zeppelin.k8s.envs", envs);
Expand All @@ -310,7 +330,7 @@ String sparkUiWebUrlFromTemplate(String templateString, int port, String service
ImmutableMap<String, Object> binding = ImmutableMap.of(
"PORT", port,
"SERVICE_NAME", serviceName,
"SERVICE_DOMAIN", serviceDomain
ENV_SERVICE_DOMAIN, serviceDomain
);

ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
Expand Down Expand Up @@ -339,8 +359,8 @@ String buildSparkSubmitOptions() {

options.append(" --master k8s://https://kubernetes.default.svc");
options.append(" --deploy-mode client");
if (properties.containsKey("spark.driver.memory")) {
options.append(" --driver-memory " + properties.get("spark.driver.memory"));
if (properties.containsKey(SPARK_DRIVER_MEMROY)) {
options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMROY));
}
if (userName != null) {
options.append(" --proxy-user " + userName);
Expand Down Expand Up @@ -398,9 +418,8 @@ private String getRandomString(int length) {
char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();

StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
char c = chars[random.nextInt(chars.length)];
char c = chars[rand.nextInt(chars.length)];
sb.append(c);
}
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.zeppelin.interpreter.launcher;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;

public class K8sUtils {

private static final long K = 1024;
private static final long M = K * K;
private static final long G = M * K;
private static final long T = G * K;
private static final long MINIMUM_OVERHEAD = 384;

private K8sUtils() {
// do nothing
}

public static String calculateMemoryWithDefaultOverhead(String memory) {
long memoryMB = convertToBytes(memory) / M;
long memoryOverheadMB = Math.max((long) (memoryMB * 0.1f), MINIMUM_OVERHEAD);
return (memoryMB + memoryOverheadMB) + "Mi";
}

public static String calculateSparkMemory(String memory, String memoryOverhead) {
long memoryMB = convertToBytes(memory) / M;
long memoryOverheadMB = convertToBytes(memoryOverhead) / M;
return (memoryMB + memoryOverheadMB) + "Mi";
}

private static long convertToBytes(String memory) {
String lower = memory.toLowerCase().trim();
Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
long value;
String suffix;
if (m.matches()) {
value = Long.parseLong(m.group(1));
suffix = m.group(2);
} else {
throw new NumberFormatException("Failed to parse string: " + memory);
}

long memoryAmountBytes = value;
if (StringUtils.containsIgnoreCase(suffix, "k")) {
memoryAmountBytes = value * K;
} else if (StringUtils.containsIgnoreCase(suffix, "m")) {
memoryAmountBytes = value * M;
} else if (StringUtils.containsIgnoreCase(suffix, "g")) {
memoryAmountBytes = value * G;
} else if (StringUtils.containsIgnoreCase(suffix, "t")) {
memoryAmountBytes = value * T;
}
if (0 > memoryAmountBytes) {
throw new NumberFormatException("Conversion of " + memory + " exceeds Long.MAX_VALUE");
}
return memoryAmountBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void testGetTemplateBindings() throws IOException {
assertEquals("V1", envs.get("MY_ENV1"));

envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
assertEquals(true, envs.containsKey("SERVICE_DOMAIN"));
assertEquals(true, envs.containsKey("ZEPPELIN_HOME"));
assertTrue(envs.containsKey("SERVICE_DOMAIN"));
assertTrue(envs.containsKey("ZEPPELIN_HOME"));
}

@Test
Expand Down Expand Up @@ -337,4 +337,78 @@ public void testSparkUiWebUrlTemplate() {
"zeppelin-server",
"my.domain.com"));
}

@Test
public void testSparkPodResources() {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");

Properties properties = new Properties();
properties.put("spark.driver.memory", "1g");
properties.put("spark.driver.cores", "1");
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("SERVICE_DOMAIN", "mydomain");

K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
kubectl,
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
"spark",
"myspark",
properties,
envs,
"zeppelin.server.service",
"12320",
false,
"spark-container:1.0",
10,
false);

// when
Properties p = intp.getTemplateBindings();

// then
assertEquals("1", p.get("zeppelin.k8s.interpreter.cores"));
assertEquals("1408Mi", p.get("zeppelin.k8s.interpreter.memory"));
}

@Test
public void testSparkPodResourcesMemoryOverhead() {
// given
Kubectl kubectl = mock(Kubectl.class);
when(kubectl.getNamespace()).thenReturn("default");

Properties properties = new Properties();
properties.put("spark.driver.memory", "1g");
properties.put("spark.driver.memoryOverhead", "256m");
properties.put("spark.driver.cores", "5");
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("SERVICE_DOMAIN", "mydomain");

K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
kubectl,
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
"spark",
"myspark",
properties,
envs,
"zeppelin.server.service",
"12320",
false,
"spark-container:1.0",
10,
false);

// when
Properties p = intp.getTemplateBindings();

// then
assertEquals("5", p.get("zeppelin.k8s.interpreter.cores"));
assertEquals("1280Mi", p.get("zeppelin.k8s.interpreter.memory"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.zeppelin.interpreter.launcher;

import static org.junit.Assert.*;

import org.junit.Test;

public class K8sUtilsTest {

@Test
public void testConvert() {
assertEquals("484Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100m"));
assertEquals("1408Mi", K8sUtils.calculateMemoryWithDefaultOverhead("1Gb"));
assertEquals("4505Mi", K8sUtils.calculateMemoryWithDefaultOverhead("4Gb"));
assertEquals("6758Mi", K8sUtils.calculateMemoryWithDefaultOverhead("6Gb"));
assertEquals("9011Mi", K8sUtils.calculateMemoryWithDefaultOverhead("8Gb"));
// some extrem values
assertEquals("112640Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100Gb"));
assertEquals("115343360Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100Tb"));
}

@Test(expected = NumberFormatException.class)
public void testExceptionMaxLong() {
K8sUtils.calculateMemoryWithDefaultOverhead("10000000Tb");
}

@Test(expected = NumberFormatException.class)
public void testExceptionNoValidNumber() {
K8sUtils.calculateMemoryWithDefaultOverhead("NoValidNumber10000000Tb");
}
}