diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index b1e1baf039a..c7d449f4a93 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -148,6 +148,10 @@ public boolean isSpark1() { return sparkInterpreter.getSparkVersion().getMajorVersion() == 1; } + public boolean isSpark3() { + return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; + } + public JavaSparkContext getJavaSparkContext() { return sparkInterpreter.getJavaSparkContext(); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index f180799e029..945aa40b7c2 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -221,4 +221,8 @@ public Object getSQLContext() { public boolean isSpark1() { return sparkInterpreter.getSparkVersion().getMajorVersion() == 1; } + + public boolean isSpark3() { + return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; + } } diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py index 5d199bc0af8..4b9f67f8b14 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py @@ -39,6 +39,10 @@ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point + +if intp.isSpark3(): + warnings.filterwarnings(action='ignore', module='pyspark.util') + jsc = intp.getJavaSparkContext() java_import(gateway.jvm, "org.apache.spark.sql.*") diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py index 7132b8aae88..9a02cd21401 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py @@ -15,6 +15,8 @@ # limitations under the License. # +import warnings + from py4j.java_gateway import java_import from pyspark.conf import SparkConf from pyspark.context import SparkContext @@ -24,6 +26,9 @@ intp = gateway.entry_point +if intp.isSpark3(): + warnings.filterwarnings(action='ignore', module='pyspark.util') + jsc = intp.getJavaSparkContext() java_import(gateway.jvm, "org.apache.spark.SparkEnv") java_import(gateway.jvm, "org.apache.spark.SparkConf") diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 7dc888f39d8..617587af82c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -187,6 +187,8 @@ && getDeployMode().equals("cluster")) { } else { LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified"); } + + env.put("PYSPARK_PIN_THREAD", "true"); LOGGER.debug("buildEnvFromProperties: " + env); return env;