Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 13 additions & 0 deletions docs/usage/interpreter/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,16 @@ Before 0.8.0, Zeppelin don't have lifecycle management on interpreter. User have
`NullLifecycleManager` will do nothing,
user need to control the lifecycle of interpreter by themselves as before. `TimeoutLifecycleManager` will shutdown interpreters after interpreter idle for a while. By default, the idle threshold is 1 hour.
User can change it via `zeppelin.interpreter.lifecyclemanager.timeout.threshold`. `TimeoutLifecycleManager` is the default lifecycle manager, user can change it via `zeppelin.interpreter.lifecyclemanager.class`.


## Generic ConfInterpreter

Zeppelin's interpreter setting is shared by all users and notes, if you want to have different setting you have to create new interpreter, e.g. you can create `spark_jar1` for running spark with dependency jar1 and `spark_jar2` for running spark with dependency jar2.
This approach works, but not so convenient. `ConfInterpreter` can provide more fine-grained control on interpreter setting and more flexibility.

`ConfInterpreter` is a generic interpreter that could be used by any interpreters. The input format should be property file format.
It can be used to make custom setting for any interpreter. But it requires to run before interpreter process launched. And when interpreter process is launched is determined by interpreter mode setting.
So users needs to understand the ([interpreter mode setting ](../usage/interpreter/interpreter_bindings_mode.html) of Zeppelin and be aware when interpreter process is launched. E.g. If we set spark interpreter setting as isolated per note. Under this setting, each note will launch one interpreter process.
In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR)
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/screenshots/conf_interpreter.png" width="500px">

Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ public void testSparkZeppelinContextDynamicForms() throws IOException {
assertEquals("1", result[1]);
assertEquals("items: Seq[Object] = Buffer(2)", result[2]);
assertEquals("2", result[3]);

ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}

@Test
Expand Down Expand Up @@ -568,5 +570,33 @@ public void testPySparkZeppelinContextDynamicForms() throws IOException {
assertEquals("default_name", result[0]);
assertEquals("1", result[1]);
assertEquals("2", result[2]);

ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}

@Test
public void testConfInterpreter() throws IOException {
Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
Map config = p.getConfig();
config.put("enabled", true);
p.setConfig(config);
p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0");
p.setAuthenticationInfo(anonymous);
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());

Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setConfig(config);
p1.setText("%spark\nimport com.databricks.spark.csv._");
p1.setAuthenticationInfo(anonymous);
note.run(p1.getId());

waitForFinish(p1);
assertEquals(Status.FINISHED, p1.getStatus());

ZeppelinServer.notebook.removeNote(note.getId(), anonymous);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.interpreter;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;

/**
* Special Interpreter for Interpreter Configuration customization. It is attached to each
* InterpreterGroup implicitly by Zeppelin.
*/
public class ConfInterpreter extends Interpreter {

private static Logger LOGGER = LoggerFactory.getLogger(ConfInterpreter.class);

private String interpreterGroupId;
private InterpreterSetting interpreterSetting;


public ConfInterpreter(Properties properties,
String interpreterGroupId,
InterpreterSetting interpreterSetting) {
super(properties);
this.interpreterGroupId = interpreterGroupId;
this.interpreterSetting = interpreterSetting;
}

@Override
public void open() throws InterpreterException {

}

@Override
public void close() throws InterpreterException {

}

@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {

try {
Properties finalProperties = new Properties();
finalProperties.putAll(getProperties());
Properties newProperties = new Properties();
newProperties.load(new StringReader(st));
finalProperties.putAll(newProperties);
LOGGER.debug("Properties for InterpreterGroup: " + interpreterGroupId + " is "
+ finalProperties);
interpreterSetting.setInterpreterGroupProperties(interpreterGroupId, finalProperties);
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} catch (IOException e) {
LOGGER.error("Fail to update interpreter setting", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}

@Override
public void cancel(InterpreterContext context) throws InterpreterException {

}

@Override
public FormType getFormType() throws InterpreterException {
return null;
}

@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,11 @@ public class InterpreterSetting {
// launcher in future when we have other launcher implementation. e.g. third party launcher
// service like livy
private transient InterpreterLauncher launcher;
///////////////////////////////////////////////////////////////////////////////////////////

private transient LifecycleManager lifecycleManager;
///////////////////////////////////////////////////////////////////////////////////////////



/**
* Builder class for InterpreterSetting
Expand Down Expand Up @@ -648,12 +650,11 @@ public void clearNoteIdAndParaMap() {
///////////////////////////////////////////////////////////////////////////////////////
// This is the only place to create interpreters. For now we always create multiple interpreter
// together (one session). We don't support to create single interpreter yet.
List<Interpreter> createInterpreters(String user, String sessionId) {
List<Interpreter> createInterpreters(String user, String interpreterGroupId, String sessionId) {
List<Interpreter> interpreters = new ArrayList<>();
List<InterpreterInfo> interpreterInfos = getInterpreterInfos();
for (InterpreterInfo info : interpreterInfos) {
Interpreter interpreter = null;
interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
Interpreter interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
info.getClassName(), user, lifecycleManager);
if (info.isDefaultInterpreter()) {
interpreters.add(0, interpreter);
Expand All @@ -663,15 +664,17 @@ List<Interpreter> createInterpreters(String user, String sessionId) {
LOGGER.info("Interpreter {} created for user: {}, sessionId: {}",
interpreter.getClassName(), user, sessionId);
}
interpreters.add(new ConfInterpreter(getJavaProperties(), interpreterGroupId, this));
return interpreters;
}

synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
synchronized RemoteInterpreterProcess createInterpreterProcess(Properties properties)
throws IOException {
if (launcher == null) {
createLauncher();
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, group, name);
InterpreterLaunchContext(properties, option, interpreterRunner, id, group, name);
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
process.setRemoteInterpreterEventPoller(
new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
Expand Down Expand Up @@ -716,6 +719,11 @@ private String getInterpreterClassFromInterpreterSetting(String replName) {
return info.getClassName();
}
}
//TODO(zjffdu) It requires user can not create interpreter with name `conf`,
// conf is a reserved word of interpreter name
if (replName.equals("conf")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have to hardcode this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I make conf as a reserved interpreter name for this generic ConfInterpreter so that all the interpreters can use it. e.g. %md.conf, %sh.conf, %spark.conf

return ConfInterpreter.class.getName();
}
return null;
}

Expand All @@ -728,6 +736,29 @@ private ManagedInterpreterGroup createInterpreterGroup(String groupId) {
return interpreterGroup;
}

/**
* Throw exception when interpreter process has already launched
*
* @param interpreterGroupId
* @param properties
* @throws IOException
*/
public void setInterpreterGroupProperties(String interpreterGroupId, Properties properties)
throws IOException {
ManagedInterpreterGroup interpreterGroup = this.interpreterGroups.get(interpreterGroupId);
for (List<Interpreter> session : interpreterGroup.sessions.values()) {
for (Interpreter intp : session) {
if (!intp.getProperties().equals(properties) &&
interpreterGroup.getRemoteInterpreterProcess() != null &&
interpreterGroup.getRemoteInterpreterProcess().isRunning()) {
throw new IOException("Can not change interpreter properties when interpreter process " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worry about this? I think this is the way why %dep is so hard to use - the implicit order of interpreter is not very visible to an user, especially with different interpreter isolation mode etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is what I mention in the documentation. This generic ConfInterpreter is for advanced users that they needs to understand the interpreter mode setting of zeppelin and be aware when interpreter process is launched.

"has already been launched");
}
intp.setProperties(properties);
}
}
}

private void loadInterpreterDependencies() {
setStatus(Status.DOWNLOADING_DEPENDENCIES);
setErrorReason(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

/**
* ManagedInterpreterGroup runs under zeppelin server
Expand All @@ -54,10 +55,11 @@ public InterpreterSetting getInterpreterSetting() {
return interpreterSetting;
}

public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(Properties properties)
throws IOException {
if (remoteInterpreterProcess == null) {
LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(properties);
}
return remoteInterpreterProcess;
}
Expand Down Expand Up @@ -131,7 +133,7 @@ public synchronized List<Interpreter> getOrCreateSession(String user, String ses
if (sessions.containsKey(sessionId)) {
return sessions.get(sessionId);
} else {
List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, sessionId);
List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, id, sessionId);
for (Interpreter interpreter : interpreters) {
interpreter.setInterpreterGroup(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.ConfInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
Expand Down Expand Up @@ -101,7 +102,7 @@ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() thr
return this.interpreterProcess;
}
ManagedInterpreterGroup intpGroup = getInterpreterGroup();
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(properties);
synchronized (interpreterProcess) {
if (!interpreterProcess.isRunning()) {
interpreterProcess.start(this.getUserName(), false);
Expand Down Expand Up @@ -130,7 +131,9 @@ public void open() throws InterpreterException {
for (Interpreter interpreter : getInterpreterGroup()
.getOrCreateSession(this.getUserName(), sessionId)) {
try {
((RemoteInterpreter) interpreter).internal_create();
if (!(interpreter instanceof ConfInterpreter)) {
((RemoteInterpreter) interpreter).internal_create();
}
} catch (IOException e) {
throw new InterpreterException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ public boolean execute(boolean blocking) {
setStatus(Job.Status.ERROR);
throw intpException;
}
setStatus(Status.READY);
if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) {
setAuthenticationInfo(getAuthenticationInfo());
interpreter.getScheduler().submit(this);
Expand Down
Loading