Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ public class LivyHelper {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
HashMap<String, Object> paragraphHttpMap = new HashMap<>();
Properties property;
String type;
String SPARK = "spark";
String SPARKR = "sparkr";
String PYSPARK = "pyspark";

LivyHelper(Properties property) {
LivyHelper(Properties property, String type) {
this.property = property;
this.type = type;
}

public Integer createSession(InterpreterContext context, String kind) throws Exception {
Expand Down Expand Up @@ -132,7 +137,6 @@ public Integer createSession(InterpreterContext context, String kind) throws Exc

public InterpreterResult interpretInput(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap,
LivyOutputStream out) {
try {
String[] lines = stringLines.split("\n");
Expand Down Expand Up @@ -182,7 +186,7 @@ public InterpreterResult interpretInput(String stringLines,

InterpreterResult res;
try {
res = interpret(incomplete + s, context, userSessionMap);
res = interpret(incomplete + s, context);
} catch (Exception e) {
LOGGER.error("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
Expand Down Expand Up @@ -216,8 +220,7 @@ public InterpreterResult interpretInput(String stringLines,
}

public InterpreterResult interpret(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap)
final InterpreterContext context)
throws Exception {
stringLines = stringLines
//for "\n" present in string
Expand All @@ -232,7 +235,7 @@ public InterpreterResult interpret(String stringLines,
if (stringLines.trim().equals("")) {
return new InterpreterResult(Code.SUCCESS, "");
}
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
Map jsonMap = executeCommand(stringLines, context);
Integer id = ((Double) jsonMap.get("id")).intValue();
InterpreterResult res = getResultFromMap(jsonMap);
if (res != null) {
Expand All @@ -244,7 +247,7 @@ public InterpreterResult interpret(String stringLines,
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
return new InterpreterResult(Code.INCOMPLETE, "");
}
jsonMap = getStatusById(context, userSessionMap, id);
jsonMap = getStatusById(context, id);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {
return interpreterResult;
Expand Down Expand Up @@ -289,10 +292,10 @@ private InterpreterResult getResultFromMap(Map jsonMap) {
return null;
}

private Map executeCommand(String lines, InterpreterContext context,
Map<String, Integer> userSessionMap) throws Exception {
private Map executeCommand(String lines, InterpreterContext context
) throws Exception {
String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ getUserSessionId(context)
+ "/statements",
"POST",
"{\"code\": \"" + lines + "\" }",
Expand All @@ -311,11 +314,29 @@ private Map executeCommand(String lines, InterpreterContext context,
throw e;
}
}

private Integer getUserSessionId(InterpreterContext context) {
Integer sessionId = null;
if (this.type.equalsIgnoreCase(SPARK)){
sessionId = LivySparkSessionMap.getInstance().getSparkUserSession(context
.getAuthenticationInfo().getUser());
}
if (this.type.equalsIgnoreCase(SPARKR)){
sessionId = LivySparkRSessionMap.getInstance().getSparkUserSession(context
.getAuthenticationInfo().getUser());
}
if (this.type.equalsIgnoreCase(PYSPARK)){
sessionId = LivyPySparkSessionMap.getInstance().getSparkUserSession(context
.getAuthenticationInfo().getUser());
}
return sessionId;
}

private Map getStatusById(InterpreterContext context,
Map<String, Integer> userSessionMap, Integer id) throws Exception {
Integer id) throws Exception {

String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ getUserSessionId(context)
+ "/statements/" + id,
"GET", null, context.getParagraphId());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class LivyPySparkInterpreter extends Interpreter {

Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);

protected Map<String, Integer> userSessionMap;
private LivyPySparkSessionMap lsmap;
protected LivyHelper livyHelper;
private String type = "pyspark";


public LivyPySparkInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
livyHelper = new LivyHelper(property);
lsmap = LivyPySparkSessionMap.getInstance();
livyHelper = new LivyHelper(property, type);
}

@Override
Expand All @@ -52,15 +54,15 @@ public void open() {

@Override
public void close() {
livyHelper.closeSession(userSessionMap);
livyHelper.closeSession(lsmap.getSparkUserSessionMap());
}

@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
if (lsmap.getSparkUserSession(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
lsmap.setSparkUserSessionMap(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
Expand All @@ -76,7 +78,7 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}

return livyHelper.interpret(line, interpreterContext, userSessionMap);
return livyHelper.interpret(line, interpreterContext);
} catch (Exception e) {
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.livy;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Livy PySparkSessionMap for Zeppelin.
*/
public class LivyPySparkSessionMap {
private static LivyPySparkSessionMap instance = null;
protected static Map<String, Integer> userSparkSessionMap =
new ConcurrentHashMap<String, Integer>();
private static Object mutex = new Object();


protected LivyPySparkSessionMap() {
// Exists only to defeat instantiation
}
public static LivyPySparkSessionMap getInstance() {
if (instance == null) {
synchronized (mutex){
if (instance == null) instance = new LivyPySparkSessionMap();
}
}
return instance;
}
public void setSparkUserSessionMap(String user, Integer sessionInt) {
userSparkSessionMap.put(user, sessionInt);
}
public void deleteSparkUserSessionMap(String user, Integer sessionInt) {
userSparkSessionMap.remove(user, sessionInt);
}
public void deleteSparkUserSessionMap(String user) {
userSparkSessionMap.remove(user);
}
public Integer getSparkUserSession(String user) {
return userSparkSessionMap.get(user);
}
public Map<String, Integer> getSparkUserSessionMap() {
return userSparkSessionMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,32 @@ public class LivySparkInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class);
private LivyOutputStream out;

protected static Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
private LivySparkSessionMap lsmap;
private String type = "spark";

public LivySparkInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
livyHelper = new LivyHelper(property);
lsmap = LivySparkSessionMap.getInstance();
livyHelper = new LivyHelper(property, type);
out = new LivyOutputStream();
}

protected static Map<String, Integer> getUserSessionMap() {
return userSessionMap;
}

public void setUserSessionMap(Map<String, Integer> userSessionMap) {
this.userSessionMap = userSessionMap;
}

@Override
public void open() {
}

@Override
public void close() {
livyHelper.closeSession(userSessionMap);
livyHelper.closeSession(lsmap.getSparkUserSessionMap());
}

@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
if (lsmap.getSparkUserSession(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
lsmap.setSparkUserSessionMap(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
Expand All @@ -84,7 +77,7 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}

return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
return livyHelper.interpretInput(line, interpreterContext, out);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ public class LivySparkRInterpreter extends Interpreter {

Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class);

protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
private LivySparkRSessionMap lsmap;
private String type = "sparkr";

public LivySparkRInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
livyHelper = new LivyHelper(property);
lsmap = LivySparkRSessionMap.getInstance();
livyHelper = new LivyHelper(property, type);
}

@Override
Expand All @@ -52,15 +53,15 @@ public void open() {

@Override
public void close() {
livyHelper.closeSession(userSessionMap);
livyHelper.closeSession(lsmap.getSparkUserSessionMap());
}

@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
if (lsmap.getSparkUserSession(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
lsmap.setSparkUserSessionMap(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
Expand All @@ -76,7 +77,7 @@ public InterpreterResult interpret(String line, InterpreterContext interpreterCo
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}

return livyHelper.interpret(line, interpreterContext, userSessionMap);
return livyHelper.interpret(line, interpreterContext);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.livy;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Livy SparkRSessionMap for Zeppelin.
*/
public class LivySparkRSessionMap {
private static LivySparkRSessionMap instance = null;
protected static Map<String, Integer> userSparkSessionMap =
new ConcurrentHashMap<String, Integer>();
private static Object mutex = new Object();


protected LivySparkRSessionMap() {
// Exists only to defeat instantiation
}
public static LivySparkRSessionMap getInstance() {
if (instance == null) {
synchronized (mutex){
if (instance == null) instance = new LivySparkRSessionMap();
}
}
return instance;
}
public void setSparkUserSessionMap(String user, Integer sessionInt) {
userSparkSessionMap.put(user, sessionInt);
}
public void deleteSparkUserSessionMap(String user, Integer sessionInt) {
userSparkSessionMap.remove(user, sessionInt);
}
public void deleteSparkUserSessionMap(String user) {
userSparkSessionMap.remove(user);
}
public Integer getSparkUserSession(String user) {
return userSparkSessionMap.get(user);
}
public Map<String, Integer> getSparkUserSessionMap() {
return userSparkSessionMap;
}
}
Loading