Skip to content
Closed
86 changes: 53 additions & 33 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.*;

import org.apache.spark.HttpServer;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -163,32 +159,6 @@ private boolean useHiveContext() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
}

public SQLContext getSQLContext() {
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}
}

return sqlc;
}

public DependencyResolver getDependencyResolver() {
if (dep == null) {
dep = new DependencyResolver(intp, sc, getProperty("zeppelin.dep.localrepo"));
Expand All @@ -213,6 +183,45 @@ private DepInterpreter getDepInterpreter() {
return null;
}

private boolean useCassandraContext() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.useCassandraContext"));
}

private SQLContext loadCustomContext(final String contextName) {
Constructor<?> hc;
SQLContext context;
try {
hc = getClass().getClassLoader().loadClass(contextName)
.getConstructor(SparkContext.class);
context = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create " + contextName + ". Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
context = new SQLContext(getSparkContext());
}
return context;
}

public SQLContext getSQLContext() {
if (sqlc == null) {
if (useCassandraContext()) {
sqlc = loadCustomContext("org.apache.spark.sql.cassandra.CassandraSQLContext");
logger.debug("Loading Cassandra SQL Context");
} else if (useHiveContext()) {
sqlc = loadCustomContext("org.apache.spark.sql.hive.HiveContext");
logger.debug("Loading Hive SQL Context");
} else {
sqlc = new SQLContext(getSparkContext());
logger.debug("Loading Standard SQL Context");
}
}
return sqlc;
}

public SparkContext createSparkContext() {
System.err.println("------ Create new SparkContext " + getProperty("master") + " -------");

Expand Down Expand Up @@ -240,12 +249,23 @@ public SparkContext createSparkContext() {
}
}

SparkConf conf =
new SparkConf()
SparkConf conf = new SparkConf()
.setMaster(getProperty("master"))
.setAppName(getProperty("spark.app.name"))
.set("spark.repl.class.uri", classServerUri);

if (useCassandraContext()) {
conf.set("spark.cassandra.connection.host", getProperty("spark.cassandra.connection.host"));

if (getProperty("spark.cassandra.auth.username") != null) {
conf.set("spark.cassandra.auth.username", getProperty("spark.cassandra.auth.username"));
}

if (getProperty("spark.cassandra.auth.password") != null) {
conf.set("spark.cassandra.auth.password", getProperty("spark.cassandra.auth.password"));
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think line 268 sets all property into conf that property starts with 'spark.'. So isn't it duplicated?


if (jars.length > 0) {
conf.setJars(jars);
}
Expand Down