Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's our coding style guide on multiple imports? is it clearer to list them out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done by intellij which would use * when imports more than 5 classes in one package. I think it should be fine to do that.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
Expand All @@ -39,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Base class for livy interpreters.
Expand All @@ -48,10 +46,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();

protected SessionInfo sessionInfo;
protected volatile SessionInfo sessionInfo;
private String livyURL;
private long sessionCreationTimeout;
protected boolean displayAppInfo;
private AtomicBoolean sessionExpired = new AtomicBoolean(false);

public BaseLivyInterprereter(Properties property) {
super(property);
Expand Down Expand Up @@ -90,16 +89,17 @@ protected void initLivySession() throws LivyException {
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
// explicitly by ourselves.
sessionInfo.appId = extractStatementResult(
interpret("sc.applicationId", false).message()
interpret("sc.applicationId", false, false).message()
.get(0).getData());
}

interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false);
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
false, false);
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
sessionInfo.webUIAddress = extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false)
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false)
.message().get(0).getData());
} else {
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
Expand All @@ -120,7 +120,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
}

try {
return interpret(st, this.displayAppInfo);
return interpret(st, this.displayAppInfo, true);
} catch (LivyException e) {
LOGGER.error("Fail to interpret:" + st, e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
Expand Down Expand Up @@ -206,9 +206,26 @@ private SessionInfo getSessionInfo(int sessionId) throws LivyException {
return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
}

public InterpreterResult interpret(String code, boolean displayAppInfo)
public InterpreterResult interpret(String code, boolean displayAppInfo,
boolean appendSessionExpired)
throws LivyException {
StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code));
StatementInfo stmtInfo = null;
boolean sessionExpired = false;
try {
stmtInfo = executeStatement(new ExecuteRequest(code));
} catch (SessionNotFoundException e) {
LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
sessionExpired = true;
// we don't want to create multiple sessions because it is possible to have multiple thread
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
// to check session status again in this sync block
synchronized (this) {
if (isSessionExpired()) {
initLivySession();
}
}
stmtInfo = executeStatement(new ExecuteRequest(code));
}
// pull the statement status
while (!stmtInfo.isAvailable()) {
try {
Expand All @@ -219,7 +236,38 @@ public InterpreterResult interpret(String code, boolean displayAppInfo)
}
stmtInfo = getStatementInfo(stmtInfo.id);
}
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
if (appendSessionExpired) {
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
sessionExpired);
} else {
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
}
}

private boolean isSessionExpired() throws LivyException {
try {
getSessionInfo(sessionInfo.id);
return false;
} catch (SessionNotFoundException e) {
return true;
} catch (LivyException e) {
throw e;
}
}

private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) {
if (sessionExpired) {
InterpreterResult result2 = new InterpreterResult(result.code());
result2.add(InterpreterResult.Type.HTML,
"<font color=\"red\">Previous livy session is expired, new livy session is created. " +
"Paragraphs that depend on this paragraph need to be re-executed!" + "</font>");
for (InterpreterResultMessage message : result.message()) {
result2.add(message.getType(), message.getData());
}
return result2;
} else {
return result;
}
}

private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
Expand Down Expand Up @@ -340,7 +388,7 @@ private String callRestAPI(String targetURL, String method, String jsonData)
|| response.getStatusCode().value() == 201
|| response.getStatusCode().value() == 404) {
String responseBody = response.getBody();
if (responseBody.matches("Session '\\d+' not found.")) {
if (responseBody.matches("\"Session '\\d+' not found.\"")) {
throw new SessionNotFoundException(responseBody);
} else {
return responseBody;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public void open() {
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
// to judge whether it is using spark2.
try {
InterpreterResult result = sparkInterpreter.interpret("spark", false);
InterpreterResult result = sparkInterpreter.interpret("spark", false, false);
if (result.code() == InterpreterResult.Code.SUCCESS &&
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
sparkInterpreter.getSessionInfo().id);
isSpark2 = true;
} else {
// spark 1.x
result = sparkInterpreter.interpret("sqlContext", false);
result = sparkInterpreter.interpret("sqlContext", false, false);
if (result.code() == InterpreterResult.Code.SUCCESS) {
LOGGER.info("sqlContext is detected.");
} else if (result.code() == InterpreterResult.Code.ERROR) {
Expand All @@ -68,7 +68,7 @@ public void open() {
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", false);
+ "import sqlContext.implicits._", false, false);
if (result.code() == InterpreterResult.Code.ERROR) {
throw new LivyException("Fail to create SQLContext," +
result.message().get(0).getData());
Expand Down Expand Up @@ -113,37 +113,44 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
} else {
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
}
InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo);

if (res.code() == InterpreterResult.Code.SUCCESS) {
StringBuilder resMsg = new StringBuilder();
resMsg.append("%table ");
String[] rows = res.message().get(0).getData().split("\n");
String[] headers = rows[1].split("\\|");
for (int head = 1; head < headers.length; head++) {
resMsg.append(headers[head].trim()).append("\t");
}
resMsg.append("\n");
if (rows[3].indexOf("+") == 0) {

} else {
for (int cols = 3; cols < rows.length - 1; cols++) {
String[] col = rows[cols].split("\\|");
for (int data = 1; data < col.length; data++) {
resMsg.append(col[data].trim()).append("\t");
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true);

if (result.code() == InterpreterResult.Code.SUCCESS) {
InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS);
for (InterpreterResultMessage message : result.message()) {
// convert Text type to Table type. We assume the text type must be the sql output. This
// assumption is correct for now. Ideally livy should return table type. We may do it in
// the future release of livy.
if (message.getType() == InterpreterResult.Type.TEXT) {
StringBuilder resMsg = new StringBuilder();
String[] rows = message.getData().split("\n");
String[] headers = rows[1].split("\\|");
for (int head = 1; head < headers.length; head++) {
resMsg.append(headers[head].trim()).append("\t");
}
resMsg.append("\n");
if (rows[3].indexOf("+") == 0) {

} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this exists before, but this construct is a bit strange?

if (rows[3].indexOf("+") == 0) {
 
 } else {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about this logic, @prabhjyotsingh is the original author who might know this.

for (int cols = 3; cols < rows.length - 1; cols++) {
String[] col = rows[cols].split("\\|");
for (int data = 1; data < col.length; data++) {
resMsg.append(col[data].trim()).append("\t");
}
resMsg.append("\n");
}
}
if (rows[rows.length - 1].indexOf("only") == 0) {
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
}
result2.add(InterpreterResult.Type.TABLE, resMsg.toString());
} else {
result2.add(message.getType(), message.getData());
}
}
if (rows[rows.length - 1].indexOf("only") == 0) {
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
}

return new InterpreterResult(InterpreterResult.Code.SUCCESS,
resMsg.toString()
);
return result2;
} else {
return res;
return result;
}
} catch (Exception e) {
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
Expand Down