diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index ab0b4991a32..c4e8f2e4ee0 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -137,7 +137,7 @@ public InterpreterResult interpretInput(String stringLines, LivyOutputStream out, String appId, String webUI, - boolean displayAppInfo) { + boolean displayAppInfo) throws LivyNoSessionException { try { out.setInterpreterOutput(context.out); context.out.clear(); @@ -186,6 +186,9 @@ public InterpreterResult interpretInput(String stringLines, InterpreterResult res; try { res = interpret(incomplete + s, context, userSessionMap); + } catch (LivyNoSessionException e) { + out.write("%angular

Livy Session has been restarted. You might need to re-run paragraphs that current paragraph depends on.

"); + throw e; } catch (Exception e) { LOGGER.error("Interpreter exception", e); return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); @@ -222,6 +225,8 @@ public InterpreterResult interpretInput(String stringLines, out.setInterpreterOutput(null); return new InterpreterResult(Code.SUCCESS); } + } catch (LivyNoSessionException e) { + throw e; } catch (Exception e) { LOGGER.error("error in interpretInput", e); return new InterpreterResult(Code.ERROR, e.getMessage()); @@ -293,16 +298,21 @@ private InterpreterResult getResultFromMap(Map jsonMap) { } private Map executeCommand(String lines, InterpreterContext context, - Map userSessionMap) throws Exception { - String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/" + Map userSessionMap) throws LivyException { + String json; + try { + json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/" + userSessionMap.get(context.getAuthenticationInfo().getUser()) + "/statements", "POST", "{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}", context.getParagraphId()); - if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) { - throw new Exception("Exception: Session not found, Livy server would have restarted, " + - "or lost session."); + } catch (Exception e) { + throw new LivyException("Error executing command in Livy", e); + } + if (json == null || json.matches("^(\")?Session (\'[0-9]+\' )?not found(.?\"?)$")) { + LOGGER.warn("Livy session has been lost!"); + throw new LivyNoSessionException(); } try { Map jsonMap = gson.fromJson(json, @@ -311,7 +321,7 @@ private Map executeCommand(String lines, InterpreterContext context, return jsonMap; } catch (Exception e) { LOGGER.error("Error executeCommand", e); - throw e; + throw new LivyException("Error parsing Livy response", e); } } @@ -404,4 +414,30 @@ public void closeSession(Map userSessionMap) { } } } + + /** + * Recoverable exception thrown during interation with Livy REST API + */ + public static class LivyException extends Exception { + private static final long serialVersionUID = -443475407573079769L; + + public LivyException(String message) { + super(message); + } + + public LivyException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * This exception is thrown when Livy session was lost + */ + public static class LivyNoSessionException extends LivyException { + private static final long serialVersionUID = -984922534258421615L; + + public LivyNoSessionException() { + super("Session not found, Livy server would have restarted, " + "or lost session."); + } + } } diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java index bd342a2e3ed..009428c0f04 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java @@ -19,6 +19,7 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.livy.LivyHelper.LivyNoSessionException; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -77,6 +78,11 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo } return livyHelper.interpret(line, interpreterContext, userSessionMap); + + } catch (LivyNoSessionException e) { + userSessionMap.remove(interpreterContext.getAuthenticationInfo().getUser()); + return interpret(line, interpreterContext); + } catch (Exception e) { LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e); return new InterpreterResult(InterpreterResult.Code.ERROR, diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 9a9dd8080ee..550fa760c65 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -19,6 +19,7 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.livy.LivyHelper.LivyNoSessionException; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -110,6 +111,11 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out, sessionId2AppIdMap.get(sessionId), sessionId2WebUIMap.get(sessionId), displayAppInfo); + + } catch (LivyNoSessionException e) { + userSessionMap.remove(interpreterContext.getAuthenticationInfo().getUser()); + return interpret(line, interpreterContext); + } catch (Exception e) { LOGGER.error("Exception in LivySparkInterpreter while interpret ", e); return new InterpreterResult(InterpreterResult.Code.ERROR, diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java index 753b378e9a1..1206b819590 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java @@ -19,6 +19,7 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.livy.LivyHelper.LivyNoSessionException; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -77,6 +78,11 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo } return livyHelper.interpret(line, interpreterContext, userSessionMap); + + } catch (LivyNoSessionException e) { + userSessionMap.remove(interpreterContext.getAuthenticationInfo().getUser()); + return interpret(line, interpreterContext); + } catch (Exception e) { LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e); return new InterpreterResult(InterpreterResult.Code.ERROR, diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 3d4a0f4e428..18c061ca080 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -19,6 +19,7 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.livy.LivyHelper.LivyNoSessionException; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -113,7 +114,9 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo } else { return res; } - + } catch (LivyNoSessionException e) { + userSessionMap.remove(interpreterContext.getAuthenticationInfo().getUser()); + return interpret(line, interpreterContext); } catch (Exception e) { LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);