diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 0c8c8e2057d..8ed4622b0b4 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -21,10 +21,7 @@ import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.apache.zeppelin.interpreter.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpEntity; @@ -39,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; /** * Base class for livy interpreters. @@ -48,10 +46,11 @@ public abstract class BaseLivyInterprereter extends Interpreter { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class); private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create(); - protected SessionInfo sessionInfo; + protected volatile SessionInfo sessionInfo; private String livyURL; private long sessionCreationTimeout; protected boolean displayAppInfo; + private AtomicBoolean sessionExpired = new AtomicBoolean(false); public BaseLivyInterprereter(Properties property) { super(property); @@ -90,16 +89,17 @@ protected void initLivySession() throws LivyException { // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it // explicitly by ourselves. sessionInfo.appId = extractStatementResult( - interpret("sc.applicationId", false).message() + interpret("sc.applicationId", false, false).message() .get(0).getData()); } interpret( - "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false); + "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", + false, false); if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) { sessionInfo.webUIAddress = extractStatementResult( interpret( - "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false) + "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false) .message().get(0).getData()); } else { sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl"); @@ -120,7 +120,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) { } try { - return interpret(st, this.displayAppInfo); + return interpret(st, this.displayAppInfo, true); } catch (LivyException e) { LOGGER.error("Fail to interpret:" + st, e); return new InterpreterResult(InterpreterResult.Code.ERROR, @@ -206,9 +206,26 @@ private SessionInfo getSessionInfo(int sessionId) throws LivyException { return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET")); } - public InterpreterResult interpret(String code, boolean displayAppInfo) + public InterpreterResult interpret(String code, boolean displayAppInfo, + boolean appendSessionExpired) throws LivyException { - StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code)); + StatementInfo stmtInfo = null; + boolean sessionExpired = false; + try { + stmtInfo = executeStatement(new ExecuteRequest(code)); + } catch (SessionNotFoundException e) { + LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id); + sessionExpired = true; + // we don't want to create multiple sessions because it is possible to have multiple thread + // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need + // to check session status again in this sync block + synchronized (this) { + if (isSessionExpired()) { + initLivySession(); + } + } + stmtInfo = executeStatement(new ExecuteRequest(code)); + } // pull the statement status while (!stmtInfo.isAvailable()) { try { @@ -219,7 +236,38 @@ public InterpreterResult interpret(String code, boolean displayAppInfo) } stmtInfo = getStatementInfo(stmtInfo.id); } - return getResultFromStatementInfo(stmtInfo, displayAppInfo); + if (appendSessionExpired) { + return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo), + sessionExpired); + } else { + return getResultFromStatementInfo(stmtInfo, displayAppInfo); + } + } + + private boolean isSessionExpired() throws LivyException { + try { + getSessionInfo(sessionInfo.id); + return false; + } catch (SessionNotFoundException e) { + return true; + } catch (LivyException e) { + throw e; + } + } + + private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) { + if (sessionExpired) { + InterpreterResult result2 = new InterpreterResult(result.code()); + result2.add(InterpreterResult.Type.HTML, + "Previous livy session is expired, new livy session is created. " + + "Paragraphs that depend on this paragraph need to be re-executed!" + ""); + for (InterpreterResultMessage message : result.message()) { + result2.add(message.getType(), message.getData()); + } + return result2; + } else { + return result; + } } private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo, @@ -340,7 +388,7 @@ private String callRestAPI(String targetURL, String method, String jsonData) || response.getStatusCode().value() == 201 || response.getStatusCode().value() == 404) { String responseBody = response.getBody(); - if (responseBody.matches("Session '\\d+' not found.")) { + if (responseBody.matches("\"Session '\\d+' not found.\"")) { throw new SessionNotFoundException(responseBody); } else { return responseBody; diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index cdc8b5b5560..0e78860ceab 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -51,7 +51,7 @@ public void open() { // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession // to judge whether it is using spark2. try { - InterpreterResult result = sparkInterpreter.interpret("spark", false); + InterpreterResult result = sparkInterpreter.interpret("spark", false, false); if (result.code() == InterpreterResult.Code.SUCCESS && result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) { LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}", @@ -59,7 +59,7 @@ public void open() { isSpark2 = true; } else { // spark 1.x - result = sparkInterpreter.interpret("sqlContext", false); + result = sparkInterpreter.interpret("sqlContext", false, false); if (result.code() == InterpreterResult.Code.SUCCESS) { LOGGER.info("sqlContext is detected."); } else if (result.code() == InterpreterResult.Code.ERROR) { @@ -68,7 +68,7 @@ public void open() { LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves"); result = sparkInterpreter.interpret( "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" - + "import sqlContext.implicits._", false); + + "import sqlContext.implicits._", false, false); if (result.code() == InterpreterResult.Code.ERROR) { throw new LivyException("Fail to create SQLContext," + result.message().get(0).getData()); @@ -113,37 +113,44 @@ public InterpreterResult interpret(String line, InterpreterContext context) { } else { sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; } - InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo); - - if (res.code() == InterpreterResult.Code.SUCCESS) { - StringBuilder resMsg = new StringBuilder(); - resMsg.append("%table "); - String[] rows = res.message().get(0).getData().split("\n"); - String[] headers = rows[1].split("\\|"); - for (int head = 1; head < headers.length; head++) { - resMsg.append(headers[head].trim()).append("\t"); - } - resMsg.append("\n"); - if (rows[3].indexOf("+") == 0) { - - } else { - for (int cols = 3; cols < rows.length - 1; cols++) { - String[] col = rows[cols].split("\\|"); - for (int data = 1; data < col.length; data++) { - resMsg.append(col[data].trim()).append("\t"); + InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true); + + if (result.code() == InterpreterResult.Code.SUCCESS) { + InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS); + for (InterpreterResultMessage message : result.message()) { + // convert Text type to Table type. We assume the text type must be the sql output. This + // assumption is correct for now. Ideally livy should return table type. We may do it in + // the future release of livy. + if (message.getType() == InterpreterResult.Type.TEXT) { + StringBuilder resMsg = new StringBuilder(); + String[] rows = message.getData().split("\n"); + String[] headers = rows[1].split("\\|"); + for (int head = 1; head < headers.length; head++) { + resMsg.append(headers[head].trim()).append("\t"); } resMsg.append("\n"); + if (rows[3].indexOf("+") == 0) { + + } else { + for (int cols = 3; cols < rows.length - 1; cols++) { + String[] col = rows[cols].split("\\|"); + for (int data = 1; data < col.length; data++) { + resMsg.append(col[data].trim()).append("\t"); + } + resMsg.append("\n"); + } + } + if (rows[rows.length - 1].indexOf("only") == 0) { + resMsg.append("" + rows[rows.length - 1] + "."); + } + result2.add(InterpreterResult.Type.TABLE, resMsg.toString()); + } else { + result2.add(message.getType(), message.getData()); } } - if (rows[rows.length - 1].indexOf("only") == 0) { - resMsg.append("" + rows[rows.length - 1] + "."); - } - - return new InterpreterResult(InterpreterResult.Code.SUCCESS, - resMsg.toString() - ); + return result2; } else { - return res; + return result; } } catch (Exception e) { LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);