Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
Expand All @@ -39,12 +38,8 @@
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Properties;


/***
Expand All @@ -64,24 +59,24 @@ public class LivyHelper {
public Integer createSession(InterpreterContext context, String kind) throws Exception {
try {
Map<String, String> conf = new HashMap<String, String>();

Iterator<Entry<Object, Object>> it = property.entrySet().iterator();
while (it.hasNext()) {
Entry<Object, Object> pair = it.next();
if (pair.getKey().toString().startsWith("livy.spark.") &&
if (pair.getKey().toString().startsWith("livy.spark.") &&
!pair.getValue().toString().isEmpty())
conf.put(pair.getKey().toString().substring(5), pair.getValue().toString());
}

String confData = gson.toJson(conf);

String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
"POST",
"POST",
"{" +
"\"kind\": \"" + kind + "\", " +
"\"conf\": " + confData + ", " +
"\"proxyUser\": " + context.getAuthenticationInfo().getUser() +
"}",
"\"conf\": " + confData + ", " +
"\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() +
"\"}",
context.getParagraphId()
);

Expand All @@ -96,9 +91,8 @@ public Integer createSession(InterpreterContext context, String kind) throws Exc
LOGGER.error(String.format("sessionId:%s state is %s",
jsonMap.get("id"), jsonMap.get("state")));
Thread.sleep(1000);
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
"GET", null,
context.getParagraphId());
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
sessionId, "GET", null, context.getParagraphId());
jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ public class LivyPySparkInterpreter extends Interpreter {

Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);

static {
Interpreter.register(
"pyspark",
"livy",
LivyPySparkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.build()
);
}

protected Map<String, Integer> userSessionMap;
protected LivyHelper livyHelper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,9 @@
*/
public class LivySparkInterpreter extends Interpreter {

static String DEFAULT_URL = "http://localhost:8998";
static String LOCAL = "local[*]";
Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class);
private LivyOutputStream out;

static {
Interpreter.register(
"spark",
"livy",
LivySparkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.")
.add("livy.spark.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077")
.add("livy.spark.driver.cores", "", "Driver cores. ex) 1, 2")
.add("livy.spark.driver.memory", "", "Driver memory. ex) 512m, 32g")
.add("livy.spark.executor.instances", "", "Executor instances. ex) 1, 4")
.add("livy.spark.executor.cores", "", "Num cores per executor. ex) 1, 4")
.add("livy.spark.executor.memory", "",
"Executor memory per worker instance. ex) 512m, 32g")
.add("livy.spark.dynamicAllocation.enabled", "", "Use dynamic resource allocation")
.add("livy.spark.dynamicAllocation.cachedExecutorIdleTimeout", "",
"Remove an executor which has cached data blocks")
.add("livy.spark.dynamicAllocation.minExecutors", "",
"Lower bound for the number of executors if dynamic allocation is enabled. ")
.add("livy.spark.dynamicAllocation.initialExecutors", "",
"Initial number of executors to run if dynamic allocation is enabled. ")
.add("livy.spark.dynamicAllocation.maxExecutors", "",
"Upper bound for the number of executors if dynamic allocation is enabled. ")
.build()
);
}

protected static Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ public class LivySparkRInterpreter extends Interpreter {

Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class);

static {
Interpreter.register(
"sparkr",
"livy",
LivySparkRInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.build()
);
}

protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,6 @@
public class LivySparkSQLInterpreter extends Interpreter {

Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class);
static String DEFAULT_MAX_RESULT = "1000";

static {
Interpreter.register(
"sql",
"livy",
LivySparkSQLInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("zeppelin.livy.spark.maxResult",
DEFAULT_MAX_RESULT,
"Max number of SparkSQL result to display.")
.build()
);
}

protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
Expand Down Expand Up @@ -94,7 +80,7 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo
line.replaceAll("\"", "\\\\\"")
.replaceAll("\\n", " ")
+ "\").show(" +
property.get("zeppelin.livy.spark.maxResult") + ")",
property.get("livy.spark.sql.maxResult") + ")",
interpreterContext, userSessionMap);

if (res.code() == InterpreterResult.Code.SUCCESS) {
Expand Down
97 changes: 97 additions & 0 deletions livy/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
[
{
"group": "livy",
"name": "spark",
"className": "org.apache.zeppelin.livy.LivySparkInterpreter",
"properties": {
"zeppelin.livy.url": {
"envName": "ZEPPELIN_LIVY_HOST_URL",
"propertyName": "zeppelin.livy.url",
"defaultValue": "http://localhost:8998",
"description": "The URL for Livy Server."
},
"livy.spark.master": {
"propertyName": "livy.spark.master",
"defaultValue": "local[*]",
"description": "Spark master uri. ex) spark://masterhost:7077"
},
"livy.spark.driver.cores": {
"propertyName": "livy.spark.driver.cores",
"defaultValue": "",
"description": "Driver cores. ex) 1, 2"
},
"livy.spark.driver.memory": {
"propertyName": "livy.spark.driver.memory",
"defaultValue": "",
"description": "Driver memory. ex) 512m, 32g"
},
"livy.spark.executor.instances": {
"propertyName": "livy.spark.executor.instances",
"defaultValue": "",
"description": "Executor instances. ex) 1, 4"
},
"livy.spark.executor.cores": {
"propertyName": "livy.spark.executor.cores",
"defaultValue": "",
"description": "Num cores per executor. ex) 1, 4"
},
"livy.spark.executor.memory": {
"propertyName": "livy.spark.executor.memory",
"defaultValue": "",
"description": "Executor memory per worker instance. ex) 512m, 32g"
},
"livy.spark.dynamicAllocation.enabled": {
"propertyName": "livy.spark.dynamicAllocation.enabled",
"defaultValue": "",
"description": "Use dynamic resource allocation"
},
"livy.spark.dynamicAllocation.cachedExecutorIdleTimeout": {
"propertyName": "livy.spark.dynamicAllocation.cachedExecutorIdleTimeout",
"defaultValue": "",
"description": "Remove an executor which has cached data blocks"
},
"livy.spark.dynamicAllocation.minExecutors": {
"propertyName": "livy.spark.dynamicAllocation.minExecutors",
"defaultValue": "",
"description": "Lower bound for the number of executors if dynamic allocation is enabled."
},
"livy.spark.dynamicAllocation.initialExecutors": {
"propertyName": "livy.spark.dynamicAllocation.initialExecutors",
"defaultValue": "",
"description": "Initial number of executors to run if dynamic allocation is enabled."
},
"livy.spark.dynamicAllocation.maxExecutors": {
"propertyName": "livy.spark.dynamicAllocation.maxExecutors",
"defaultValue": "",
"description": "Upper bound for the number of executors if dynamic allocation is enabled."
}
}
},
{
"group": "livy",
"name": "sql",
"className": "org.apache.zeppelin.livy.LivySparkSQLInterpreter",
"properties": {
"zeppelin.livy.spark.sql.maxResult": {
"envName": "ZEPPELIN_LIVY_MAXRESULT",
"propertyName": "zeppelin.livy.spark.sql.maxResult",
"defaultValue": "1000",
"description": "Max number of SparkSQL result to display."
}
}
},
{
"group": "livy",
"name": "pyspark",
"className": "org.apache.zeppelin.livy.LivyPySparkInterpreter",
"properties": {
}
},
{
"group": "livy",
"name": "sparkr",
"className": "org.apache.zeppelin.livy.LivySparkRInterpreter",
"properties": {
}
}
]