Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class SparkInterpreter extends Interpreter {

private Map<String, Object> binder;
private SparkVersion sparkVersion;
private File outputDir; // class outputdir for scala 2.11
private static File outputDir; // class outputdir for scala 2.11
private Object classServer; // classserver for scala 2.11


Expand Down Expand Up @@ -572,8 +572,11 @@ public void open() {
sparkReplClassDir = System.getProperty("java.io.tmpdir");
}

outputDir = createTempDir(sparkReplClassDir);

synchronized (sharedInterpreterLock) {
if (outputDir == null) {
outputDir = createTempDir(sparkReplClassDir);
}
}
argList.add("-Yrepl-class-based");
argList.add("-Yrepl-outdir");
argList.add(outputDir.getAbsolutePath());
Expand Down Expand Up @@ -1276,7 +1279,12 @@ public void close() {
logger.info("Close interpreter");

if (numReferenceOfSparkContext.decrementAndGet() == 0) {
sc.stop();
if (sparkSession != null) {
Utils.invokeMethod(sparkSession, "stop");
} else if (sc != null){
sc.stop();
}
sparkSession = null;
sc = null;
if (classServer != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set sparkSession as null, so that it will be created again if the interpreter is scoped

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop should be called on sparkSession before sc.stop()
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession

(as of now this is ok since sparkSession.stop() simply calls sc.stop() but this could change)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, when sparkSession is not null (spark 2.0), sparkSession.stop() should be called first.

Utils.invokeMethod(classServer, "stop");
Expand Down
5 changes: 4 additions & 1 deletion spark/src/main/resources/python/zeppelin_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def getCompletion(self, text_value):
jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
sqlc = SQLContext(sc, intp.getSQLContext())
if sparkVersion.isSpark2():
sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
else:
sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
sqlContext = sqlc

if sparkVersion.isSpark2():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected static void startUp() throws Exception {
// set spark master and other properties
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");

sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
pySpark = true;
Expand All @@ -171,10 +171,16 @@ protected static void startUp() throws Exception {

String sparkHome = getSparkHome();
if (sparkHome != null) {
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
if (System.getenv("SPARK_MASTER") != null) {
sparkIntpSetting.getProperties().setProperty("master", System.getenv("SPARK_MASTER"));
} else {
sparkIntpSetting.getProperties()
.setProperty("master", "spark://" + getHostname() + ":7071");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow user to specify SPARK_MASTER, so that can run other modes (like yarn-client)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is testing code only, but doesn't seem like we are using this in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for local system test when user want to run it in other modes (e.g. yarn-client).

sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
pySpark = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable HiveContext, otherwise will hit the issue of multiple derby instance.

sparkR = true;
}
Expand All @@ -194,7 +200,11 @@ private static String getHostname() {
}

private static String getSparkHome() {
String sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
String sparkHome = System.getenv("SPARK_HOME");
if (sparkHome != null) {
return sparkHome;
}
sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
System.out.println("SPARK HOME detected " + sparkHome);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow user to specify SPARK_HOME, so that can use existing spark cluster

return sparkHome;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,38 @@ public void pySparkTest() throws IOException {
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55\n", p.getResult().message());
if (sparkVersion >= 13) {
// run sqlContext test
p = note.addParagraph();
config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark from pyspark.sql import Row\n" +
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
"df.collect()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
}
if (sparkVersion >= 20) {
// run SparkSession test
p = note.addParagraph();
config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark from pyspark.sql import Row\n" +
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
"df.collect()");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
}
}
ZeppelinServer.notebook.removeNote(note.getId(), null);
}
Expand All @@ -166,7 +193,6 @@ public void pySparkAutoConvertOptionTest() throws IOException {

p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+ "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
Expand Down Expand Up @@ -257,6 +283,7 @@ public void pySparkDepLoaderTest() throws IOException {
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals("2\n", p1.getResult().message());
}
ZeppelinServer.notebook.removeNote(note.getId(), null);
}

/**
Expand All @@ -270,7 +297,6 @@ private int getSparkVersionNumber(Note note) {
config.put("enabled", true);
p.setConfig(config);
p.setText("%spark print(sc.version)");
// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
Expand Down
2 changes: 1 addition & 1 deletion zeppelin-server/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
# Log all JDBC parameters
log4j.logger.org.hibernate.type=ALL


log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG