Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public Object createSparkSession() {
}

setupConfForPySpark(conf);
setupConfForSparkR(conf);
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
Expand Down Expand Up @@ -443,6 +444,7 @@ public SparkContext createSparkContext_1() {
}
}
setupConfForPySpark(conf);
setupConfForSparkR(conf);
SparkContext sparkContext = new SparkContext(conf);
return sparkContext;
}
Expand Down Expand Up @@ -494,6 +496,35 @@ private void setupConfForPySpark(SparkConf conf) {
}
}

private void setupConfForSparkR(SparkConf conf) {
String sparkRBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue();
File sparkRPath;
if (null == sparkRBasePath) {
sparkRBasePath =
new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue();
sparkRPath = new File(sparkRBasePath,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
}

sparkRPath = new File(sparkRPath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
String archives = null;
if (conf.contains("spark.yarn.dist.archives")) {
archives = conf.get("spark.yarn.dist.archives");
}
if (archives != null) {
archives = archives + "," + sparkRPath + "#sparkr";
} else {
archives = sparkRPath + "#sparkr";
}
conf.set("spark.yarn.dist.archives", archives);
} else {
logger.warn("sparkr.zip is not found, sparkr may not work.");
}
}

static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}
Expand Down