diff --git a/docs/assets/themes/zeppelin/img/screenshots/conf_interpreter.png b/docs/assets/themes/zeppelin/img/screenshots/conf_interpreter.png new file mode 100644 index 00000000000..156c3575c9b Binary files /dev/null and b/docs/assets/themes/zeppelin/img/screenshots/conf_interpreter.png differ diff --git a/docs/usage/interpreter/overview.md b/docs/usage/interpreter/overview.md index ee0c4d796c5..dd5ed220c88 100644 --- a/docs/usage/interpreter/overview.md +++ b/docs/usage/interpreter/overview.md @@ -131,3 +131,16 @@ Before 0.8.0, Zeppelin don't have lifecycle management on interpreter. User have `NullLifecycleManager` will do nothing, user need to control the lifecycle of interpreter by themselves as before. `TimeoutLifecycleManager` will shutdown interpreters after interpreter idle for a while. By default, the idle threshold is 1 hour. User can change it via `zeppelin.interpreter.lifecyclemanager.timeout.threshold`. `TimeoutLifecycleManager` is the default lifecycle manager, user can change it via `zeppelin.interpreter.lifecyclemanager.class`. + + +## Generic ConfInterpreter + +Zeppelin's interpreter setting is shared by all users and notes, if you want to have different setting you have to create new interpreter, e.g. you can create `spark_jar1` for running spark with dependency jar1 and `spark_jar2` for running spark with dependency jar2. +This approach works, but not so convenient. `ConfInterpreter` can provide more fine-grained control on interpreter setting and more flexibility. + +`ConfInterpreter` is a generic interpreter that could be used by any interpreters. The input format should be property file format. +It can be used to make custom setting for any interpreter. But it requires to run before interpreter process launched. And when interpreter process is launched is determined by interpreter mode setting. +So users needs to understand the ([interpreter mode setting ](../usage/interpreter/interpreter_bindings_mode.html) of Zeppelin and be aware when interpreter process is launched. E.g. If we set spark interpreter setting as isolated per note. Under this setting, each note will launch one interpreter process. +In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR) + + diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 56056d65cc6..615675544ec 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -535,6 +535,8 @@ public void testSparkZeppelinContextDynamicForms() throws IOException { assertEquals("1", result[1]); assertEquals("items: Seq[Object] = Buffer(2)", result[2]); assertEquals("2", result[3]); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test @@ -568,5 +570,33 @@ public void testPySparkZeppelinContextDynamicForms() throws IOException { assertEquals("default_name", result[0]); assertEquals("1", result[1]); assertEquals("2", result[2]); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void testConfInterpreter() throws IOException { + Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + + Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setConfig(config); + p1.setText("%spark\nimport com.databricks.spark.csv._"); + p1.setAuthenticationInfo(anonymous); + note.run(p1.getId()); + + waitForFinish(p1); + assertEquals(Status.FINISHED, p1.getStatus()); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java new file mode 100644 index 00000000000..d50c57b4da5 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ConfInterpreter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; + +/** + * Special Interpreter for Interpreter Configuration customization. It is attached to each + * InterpreterGroup implicitly by Zeppelin. + */ +public class ConfInterpreter extends Interpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(ConfInterpreter.class); + + private String interpreterGroupId; + private InterpreterSetting interpreterSetting; + + + public ConfInterpreter(Properties properties, + String interpreterGroupId, + InterpreterSetting interpreterSetting) { + super(properties); + this.interpreterGroupId = interpreterGroupId; + this.interpreterSetting = interpreterSetting; + } + + @Override + public void open() throws InterpreterException { + + } + + @Override + public void close() throws InterpreterException { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { + + try { + Properties finalProperties = new Properties(); + finalProperties.putAll(getProperties()); + Properties newProperties = new Properties(); + newProperties.load(new StringReader(st)); + finalProperties.putAll(newProperties); + LOGGER.debug("Properties for InterpreterGroup: " + interpreterGroupId + " is " + + finalProperties); + interpreterSetting.setInterpreterGroupProperties(interpreterGroupId, finalProperties); + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } catch (IOException e) { + LOGGER.error("Fail to update interpreter setting", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + + } + + @Override + public FormType getFormType() throws InterpreterException { + return null; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 26fcd8e93d4..d5ff947ad0c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -138,9 +138,11 @@ public class InterpreterSetting { // launcher in future when we have other launcher implementation. e.g. third party launcher // service like livy private transient InterpreterLauncher launcher; - /////////////////////////////////////////////////////////////////////////////////////////// private transient LifecycleManager lifecycleManager; + /////////////////////////////////////////////////////////////////////////////////////////// + + /** * Builder class for InterpreterSetting @@ -648,12 +650,11 @@ public void clearNoteIdAndParaMap() { /////////////////////////////////////////////////////////////////////////////////////// // This is the only place to create interpreters. For now we always create multiple interpreter // together (one session). We don't support to create single interpreter yet. - List createInterpreters(String user, String sessionId) { + List createInterpreters(String user, String interpreterGroupId, String sessionId) { List interpreters = new ArrayList<>(); List interpreterInfos = getInterpreterInfos(); for (InterpreterInfo info : interpreterInfos) { - Interpreter interpreter = null; - interpreter = new RemoteInterpreter(getJavaProperties(), sessionId, + Interpreter interpreter = new RemoteInterpreter(getJavaProperties(), sessionId, info.getClassName(), user, lifecycleManager); if (info.isDefaultInterpreter()) { interpreters.add(0, interpreter); @@ -663,15 +664,17 @@ List createInterpreters(String user, String sessionId) { LOGGER.info("Interpreter {} created for user: {}, sessionId: {}", interpreter.getClassName(), user, sessionId); } + interpreters.add(new ConfInterpreter(getJavaProperties(), interpreterGroupId, this)); return interpreters; } - synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException { + synchronized RemoteInterpreterProcess createInterpreterProcess(Properties properties) + throws IOException { if (launcher == null) { createLauncher(); } InterpreterLaunchContext launchContext = new - InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, group, name); + InterpreterLaunchContext(properties, option, interpreterRunner, id, group, name); RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext); process.setRemoteInterpreterEventPoller( new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener)); @@ -716,6 +719,11 @@ private String getInterpreterClassFromInterpreterSetting(String replName) { return info.getClassName(); } } + //TODO(zjffdu) It requires user can not create interpreter with name `conf`, + // conf is a reserved word of interpreter name + if (replName.equals("conf")) { + return ConfInterpreter.class.getName(); + } return null; } @@ -728,6 +736,29 @@ private ManagedInterpreterGroup createInterpreterGroup(String groupId) { return interpreterGroup; } + /** + * Throw exception when interpreter process has already launched + * + * @param interpreterGroupId + * @param properties + * @throws IOException + */ + public void setInterpreterGroupProperties(String interpreterGroupId, Properties properties) + throws IOException { + ManagedInterpreterGroup interpreterGroup = this.interpreterGroups.get(interpreterGroupId); + for (List session : interpreterGroup.sessions.values()) { + for (Interpreter intp : session) { + if (!intp.getProperties().equals(properties) && + interpreterGroup.getRemoteInterpreterProcess() != null && + interpreterGroup.getRemoteInterpreterProcess().isRunning()) { + throw new IOException("Can not change interpreter properties when interpreter process " + + "has already been launched"); + } + intp.setProperties(properties); + } + } + } + private void loadInterpreterDependencies() { setStatus(Status.DOWNLOADING_DEPENDENCIES); setErrorReason(null); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index 219204f0417..2378f140daa 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Properties; /** * ManagedInterpreterGroup runs under zeppelin server @@ -54,10 +55,11 @@ public InterpreterSetting getInterpreterSetting() { return interpreterSetting; } - public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException { + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(Properties properties) + throws IOException { if (remoteInterpreterProcess == null) { LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId()); - remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(); + remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(properties); } return remoteInterpreterProcess; } @@ -131,7 +133,7 @@ public synchronized List getOrCreateSession(String user, String ses if (sessions.containsKey(sessionId)) { return sessions.get(sessionId); } else { - List interpreters = interpreterSetting.createInterpreters(user, sessionId); + List interpreters = interpreterSetting.createInterpreters(user, id, sessionId); for (Interpreter interpreter : interpreters) { interpreter.setInterpreterGroup(this); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 4ad36cf1b28..6defd9ba825 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -25,6 +25,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.ConfInterpreter; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; @@ -101,7 +102,7 @@ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() thr return this.interpreterProcess; } ManagedInterpreterGroup intpGroup = getInterpreterGroup(); - this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(); + this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(properties); synchronized (interpreterProcess) { if (!interpreterProcess.isRunning()) { interpreterProcess.start(this.getUserName(), false); @@ -130,7 +131,9 @@ public void open() throws InterpreterException { for (Interpreter interpreter : getInterpreterGroup() .getOrCreateSession(this.getUserName(), sessionId)) { try { - ((RemoteInterpreter) interpreter).internal_create(); + if (!(interpreter instanceof ConfInterpreter)) { + ((RemoteInterpreter) interpreter).internal_create(); + } } catch (IOException e) { throw new InterpreterException(e); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 5ec132931f6..32b9b73263c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -351,6 +351,7 @@ public boolean execute(boolean blocking) { setStatus(Job.Status.ERROR); throw intpException; } + setStatus(Status.READY); if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) { setAuthenticationInfo(getAuthenticationInfo()); interpreter.getScheduler().submit(this); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java new file mode 100644 index 00000000000..4d74c7cbfb5 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import com.sun.net.httpserver.Authenticator; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ConfInterpreterTest extends AbstractInterpreterTest { + + @Test + public void testCorrectConf() throws IOException, InterpreterException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter); + ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf"); + + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap(), new GUI(), new GUI(), + null, null, new ArrayList(), null); + InterpreterResult result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code); + + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test"); + remoteInterpreter.interpret("hello world", context); + assertEquals(7, remoteInterpreter.getProperties().size()); + assertEquals("new_value", remoteInterpreter.getProperty("property_1")); + assertEquals("dummy_value", remoteInterpreter.getProperty("new_property")); + assertEquals("value_3", remoteInterpreter.getProperty("property_3")); + + // rerun the paragraph with the same properties would result in SUCCESS + result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code); + + // run the paragraph with the same properties would result in ERROR + result = confInterpreter.interpret("property_1\tnew_value_2\nnew_property\tdummy_value", context); + assertEquals(InterpreterResult.Code.ERROR, result.code); + } + + @Test + public void testEmptyConf() throws IOException, InterpreterException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter); + ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf"); + + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap(), new GUI(), new GUI(), + null, null, new ArrayList(), null); + InterpreterResult result = confInterpreter.interpret("", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code); + + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test"); + assertEquals(6, remoteInterpreter.getProperties().size()); + assertEquals("value_1", remoteInterpreter.getProperty("property_1")); + assertEquals("value_3", remoteInterpreter.getProperty("property_3")); + } + + + @Test + public void testRunningAfterOtherInterpreter() throws IOException, InterpreterException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter); + ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf"); + + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap(), new GUI(), new GUI(), + null, null, new ArrayList(), null); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test"); + InterpreterResult result = remoteInterpreter.interpret("hello world", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code); + + result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context); + assertEquals(InterpreterResult.Code.ERROR, result.code); + } + +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroupTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroupTest.java index 74bd2010855..aa7374991b2 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroupTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroupTest.java @@ -62,7 +62,7 @@ public void testInterpreterGroup() { // create session_1 List interpreters = interpreterGroup.getOrCreateSession("user1", "session_1"); - assertEquals(2, interpreters.size()); + assertEquals(3, interpreters.size()); assertEquals(EchoInterpreter.class.getName(), interpreters.get(0).getClassName()); assertEquals(DoubleEchoInterpreter.class.getName(), interpreters.get(1).getClassName()); assertEquals(1, interpreterGroup.getSessionNum()); @@ -73,7 +73,7 @@ public void testInterpreterGroup() { // create session_2 List interpreters2 = interpreterGroup.getOrCreateSession("user1", "session_2"); - assertEquals(2, interpreters2.size()); + assertEquals(3, interpreters2.size()); assertEquals(EchoInterpreter.class.getName(), interpreters2.get(0).getClassName()); assertEquals(DoubleEchoInterpreter.class.getName(), interpreters2.get(1).getClassName()); assertEquals(2, interpreterGroup.getSessionNum());