diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index a15b046efeb..f3f9dec9b41 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -72,7 +72,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int properties.setProperty("spark.master", "local"); properties.setProperty("spark.app.name", "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); - properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl"); + properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl/{{applicationId}}"); // disable color output for easy testing properties.setProperty("zeppelin.spark.scala.color", "false"); properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); @@ -180,7 +180,8 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int // spark job url is sent ArgumentCaptor onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class); verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture()); - assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl")); + assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/" + + interpreter.getJavaSparkContext().sc().applicationId())); // case class result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 994c7ca5510..0361c940c1e 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -237,6 +237,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, case None => } + initSparkWebUrl() + val hiveSiteExisted: Boolean = Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false) @@ -306,7 +308,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, case Some(url) => sparkUrl = url case None => } - useYarnProxyURLIfNeeded() + + initSparkWebUrl() bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) @@ -321,6 +324,15 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, scalaInterpret("print(\"\")") } + private def initSparkWebUrl(): Unit = { + val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); + if (!StringUtils.isBlank(webUiUrl)) { + this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId); + } else { + useYarnProxyURLIfNeeded() + } + } + protected def createZeppelinContext(): Unit = { var sparkShims: SparkShims = null @@ -329,13 +341,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } else { sparkShims = SparkShims.getInstance(sc.version, properties, sc) } - var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); - if (StringUtils.isBlank(webUiUrl)) { - webUiUrl = sparkUrl; - } - useYarnProxyURLIfNeeded() - sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) z = new SparkZeppelinContext(sc, sparkShims, interpreterGroup.getInterpreterHookRegistry,