Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public class SparkInterpreter extends AbstractInterpreter {

private SparkVersion sparkVersion;
private boolean enableSupportedVersionCheck;
private String sparkUrl;


public SparkInterpreter(Properties properties) {
super(properties);
Expand Down Expand Up @@ -109,11 +107,6 @@ public void open() throws InterpreterException {
}
sqlContext = this.innerInterpreter.getSqlContext();
sparkSession = this.innerInterpreter.getSparkSession();
sparkUrl = this.innerInterpreter.getSparkUrl();
String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
if (!StringUtils.isBlank(sparkUrlProp)) {
sparkUrl = sparkUrlProp;
}

SESSION_NUM.incrementAndGet();
} catch (Exception e) {
Expand Down Expand Up @@ -260,10 +253,6 @@ private List<String> getDependencyFiles() throws InterpreterException {
return depFiles;
}

public String getSparkUIUrl() {
return sparkUrl;
}

public boolean isUnsupportedSparkVersion() {
return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class SparkInterpreterTest {
Expand Down Expand Up @@ -89,8 +90,6 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
interpreter.open();

assertEquals("fake_spark_weburl", interpreter.getSparkUIUrl());

InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("a: String = hello world\n", output);
Expand Down Expand Up @@ -181,7 +180,9 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
assertEquals("pid_2", captorEvent.getValue().get("paraId"));

// spark job url is sent
verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
ArgumentCaptor<Map> onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class);
verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture());
assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl"));

// case class
result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.net.URLClassLoader
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
Expand Down Expand Up @@ -303,7 +304,11 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,

protected def createZeppelinContext(): Unit = {
val sparkShims = SparkShims.getInstance(sc.version, properties)
sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
if (StringUtils.isBlank(webUiUrl)) {
webUiUrl = sparkUrl;
}
sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get)

z = new SparkZeppelinContext(sc, sparkShims,
interpreterGroup.getInterpreterHookRegistry,
Expand Down