diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 2c66fa92f16..ec77f1a7edf 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -96,7 +96,7 @@ public Integer createSession(InterpreterContext context, String kind) throws Exc }.getType()); if (jsonMap.get("state").equals("idle")) { break; - } else if (jsonMap.get("state").equals("error")) { + } else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) { json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId + "/log", "GET", null, @@ -124,7 +124,7 @@ public Integer createSession(InterpreterContext context, String kind) throws Exc protected void initializeSpark(final InterpreterContext context, final Map userSessionMap) throws Exception { - interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" + + interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" + "import sqlContext.implicits._", context, userSessionMap); } diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 806d7aa6b60..22773dfb1d2 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -80,7 +80,7 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo line.replaceAll("\"", "\\\\\"") .replaceAll("\\n", " ") + "\").show(" + - property.get("livy.spark.sql.maxResult") + ")", + property.get("zeppelin.livy.spark.sql.maxResult") + ")", interpreterContext, userSessionMap); if (res.code() == InterpreterResult.Code.SUCCESS) { @@ -123,6 +123,10 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo } } + public boolean concurrentSQL() { + return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL")); + } + @Override public void cancel(InterpreterContext context) { livyHelper.cancelHTTP(context.getParagraphId()); @@ -140,8 +144,19 @@ public int getProgress(InterpreterContext context) { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - LivySparkInterpreter.class.getName() + this.hashCode()); + if (concurrentSQL()) { + int maxConcurrency = 10; + return SchedulerFactory.singleton().createOrGetParallelScheduler( + LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency); + } else { + Interpreter intp = + getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName()); + if (intp != null) { + return intp.getScheduler(); + } else { + return null; + } + } } @Override diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json index 232bcb6cf23..468e9d92660 100644 --- a/livy/src/main/resources/interpreter-setting.json +++ b/livy/src/main/resources/interpreter-setting.json @@ -88,6 +88,11 @@ "propertyName": "zeppelin.livy.spark.sql.maxResult", "defaultValue": "1000", "description": "Max number of SparkSQL result to display." + }, + "zeppelin.livy.concurrentSQL": { + "propertyName": "zeppelin.livy.concurrentSQL", + "defaultValue": "false", + "description": "Execute multiple SQL concurrently if set true." } } },