diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java index baefabca3cc..fcd22f6a167 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -99,7 +100,7 @@ public static void tearDown() throws IOException { private void testInterpreterBasics() throws IOException, InterpreterException { // test FlinkInterpreter - Interpreter flinkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "flink", "flink"); + Interpreter flinkInterpreter = interpreterFactory.getInterpreter("flink", new ExecutionContext("user1", "note1", "flink")); InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); InterpreterResult interpreterResult = flinkInterpreter.interpret("1+1", context); diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java index 01c0acf0f71..9ec23d22a97 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/JdbcIntegrationTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.Lists; import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -69,7 +70,7 @@ public void testMySql() throws InterpreterException, InterruptedException { interpreterSetting.setDependencies(Lists.newArrayList(dependency)); interpreterSettingManager.restart(interpreterSetting.getId()); interpreterSetting.waitForReady(60 * 1000); - Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("user1", "note1", "jdbc", "test"); + Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("jdbc", new ExecutionContext("user1", "note1", "test")); assertNotNull("JdbcInterpreter is null", jdbcInterpreter); InterpreterContext context = new InterpreterContext.Builder() @@ -88,7 +89,7 @@ public void testMySql() throws InterpreterException, InterruptedException { assertEquals("c1\tc2\n1\t2\n", interpreterResult.message().get(0).getData()); // read table_1 from python interpreter - Interpreter pythonInterpreter = interpreterFactory.getInterpreter("user1", "note1", "python", "test"); + Interpreter pythonInterpreter = interpreterFactory.getInterpreter("python", new ExecutionContext("user1", "note1", "test")); assertNotNull("PythonInterpreter is null", pythonInterpreter); context = new InterpreterContext.Builder() diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index bebca317fdb..e0a49906c8a 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -96,7 +97,7 @@ private void testInterpreterBasics() throws IOException, InterpreterException, X sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath()); // test SparkInterpreter - Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark", "test"); + Interpreter sparkInterpreter = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note1", "test")); InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context); @@ -113,24 +114,24 @@ private void testInterpreterBasics() throws IOException, InterpreterException, X assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); // test PySparkInterpreter - Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark", "test"); + Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("spark.pyspark", new ExecutionContext("user1", "note1", "test")); interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context); assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); // test IPySparkInterpreter - Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark", "test"); + Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("spark.ipyspark", new ExecutionContext("user1", "note1", "test")); interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context); assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); // test SparkSQLInterpreter - Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.sql", "test"); + Interpreter sqlInterpreter = interpreterFactory.getInterpreter("spark.sql", new ExecutionContext("user1", "note1", "test")); interpreterResult = sqlInterpreter.interpret("select count(1) as c from test", context); assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); assertEquals("c\n2\n", interpreterResult.message().get(0).getData()); // test SparkRInterpreter - Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.r", "test"); + Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", "note1", "test")); if (isSpark2()) { interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context); } else { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java new file mode 100644 index 00000000000..e2bbe45d3b2 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +public class ExecutionContext { + + private final String user; + private final String noteId; + private final String defaultInterpreterGroup; + private final boolean inCronMode; + + public ExecutionContext(String user, String noteId) { + this(user, noteId, "", false); + } + + public ExecutionContext(String user, String noteId, String defaultInterpreterGroup) { + this(user, noteId, defaultInterpreterGroup, false); + } + + public ExecutionContext(String user, String noteId, String defaultInterpreterGroup, boolean inCronMode) { + this.user = user; + this.noteId = noteId; + this.defaultInterpreterGroup = defaultInterpreterGroup; + this.inCronMode = inCronMode; + } + + public String getUser() { + return user; + } + + public String getNoteId() { + return noteId; + } + + public String getDefaultInterpreterGroup() { + return defaultInterpreterGroup; + } + + public boolean isInCronMode() { + return inCronMode; + } + + @Override + public String toString() { + return "ExecutionContext{" + + "user='" + user + '\'' + + ", noteId='" + noteId + '\'' + + ", defaultInterpreterGroup='" + defaultInterpreterGroup + '\'' + + ", inCronMode=" + inCronMode + + '}'; + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java index 1381adc40cf..f02650b6127 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java @@ -26,9 +26,7 @@ * So access through the interface. */ public interface InterpreterFactoryInterface { - Interpreter getInterpreter(String user, - String noteId, - String replName, - String defaultInterpreterSetting) - throws InterpreterNotFoundException; + + Interpreter getInterpreter(String replName, ExecutionContext executionContext) + throws InterpreterNotFoundException; } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index a25fa06b47d..bb114c32d3d 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.dep.Repository; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterPropertyType; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -200,7 +201,8 @@ public Response restartSetting(String message, @PathParam("settingId") String se if (null == noteId) { interpreterSettingManager.close(settingId); } else { - interpreterSettingManager.restart(settingId, noteId, authenticationService.getPrincipal()); + interpreterSettingManager.restart(settingId, + new ExecutionContext(authenticationService.getPrincipal(), noteId)); } } catch (InterpreterException e) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java index cdde98038e8..3b7e2f2d035 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java @@ -100,7 +100,7 @@ public void setUp() throws Exception { InterpreterSettingManager mockInterpreterSettingManager = mock(InterpreterSettingManager.class); InterpreterFactory mockInterpreterFactory = mock(InterpreterFactory.class); Interpreter mockInterpreter = mock(Interpreter.class); - when(mockInterpreterFactory.getInterpreter(any(), any(), any(), any())) + when(mockInterpreterFactory.getInterpreter(any(), any())) .thenReturn(mockInterpreter); when(mockInterpreter.interpret(eq("invalid_code"), any())) .thenReturn(new InterpreterResult(Code.ERROR, "failed")); diff --git a/zeppelin-web/src/app/notebook/notebook-actionBar.html b/zeppelin-web/src/app/notebook/notebook-actionBar.html index ed44465ff2f..0f246838216 100644 --- a/zeppelin-web/src/app/notebook/notebook-actionBar.html +++ b/zeppelin-web/src/app/notebook/notebook-actionBar.html @@ -300,12 +300,6 @@

{{note.info.cron}}

-
- - After execution stop the interpreter - -
diff --git a/zeppelin-web/src/app/notebook/notebook.controller.js b/zeppelin-web/src/app/notebook/notebook.controller.js index a1311013308..ebab5f8cdb0 100644 --- a/zeppelin-web/src/app/notebook/notebook.controller.js +++ b/zeppelin-web/src/app/notebook/notebook.controller.js @@ -512,12 +512,6 @@ function NotebookCtrl($scope, $route, $routeParams, $location, $rootScope, $scope.setConfig(); }; - /** Set release resource for this note **/ - $scope.setReleaseResource = function(value) { - $scope.note.config.releaseresource = value; - $scope.setConfig(); - }; - /** Update note config **/ $scope.setConfig = function(config) { if (config) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 62e7c0574ea..d5b8f815445 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -17,14 +17,11 @@ package org.apache.zeppelin.interpreter; -import com.google.common.base.Preconditions; import javax.inject.Inject; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * //TODO(zjffdu) considering to move to InterpreterSettingManager * @@ -42,17 +39,15 @@ public InterpreterFactory(InterpreterSettingManager interpreterSettingManager) { } @Override - public Interpreter getInterpreter(String user, - String noteId, - String replName, - String defaultInterpreterSetting) + public Interpreter getInterpreter(String replName, + ExecutionContext executionContext) throws InterpreterNotFoundException { if (StringUtils.isBlank(replName)) { // Get the default interpreter of the defaultInterpreterSetting InterpreterSetting defaultSetting = - interpreterSettingManager.getByName(defaultInterpreterSetting); - return defaultSetting.getDefaultInterpreter(user, noteId); + interpreterSettingManager.getByName(executionContext.getDefaultInterpreterGroup()); + return defaultSetting.getDefaultInterpreter(executionContext); } String[] replNameSplits = replName.split("\\."); @@ -61,7 +56,7 @@ public Interpreter getInterpreter(String user, String name = replNameSplits[1]; InterpreterSetting setting = interpreterSettingManager.getByName(group); if (null != setting) { - Interpreter interpreter = setting.getInterpreter(user, noteId, name); + Interpreter interpreter = setting.getInterpreter(executionContext, name); if (null != interpreter) { return interpreter; } @@ -72,9 +67,9 @@ public Interpreter getInterpreter(String user, } else if (replNameSplits.length == 1){ // first assume group is omitted InterpreterSetting setting = - interpreterSettingManager.getByName(defaultInterpreterSetting); + interpreterSettingManager.getByName(executionContext.getDefaultInterpreterGroup()); if (setting != null) { - Interpreter interpreter = setting.getInterpreter(user, noteId, replName); + Interpreter interpreter = setting.getInterpreter(executionContext, replName); if (null != interpreter) { return interpreter; } @@ -83,7 +78,7 @@ public Interpreter getInterpreter(String user, // then assume interpreter name is omitted setting = interpreterSettingManager.getByName(replName); if (null != setting) { - return setting.getDefaultInterpreter(user, noteId); + return setting.getDefaultInterpreter(executionContext); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index ff5b9e0bbca..6b3eba144b4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -56,6 +56,8 @@ import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -85,6 +87,9 @@ public class InterpreterSetting { private static final Map DEFAULT_EDITOR = ImmutableMap.of( "language", (Object) "text", "editOnDblClick", false); + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"); + public static String PARAGRAPH_CONFIG_RUNONSELECTIONCHANGE = "runOnSelectionChange"; public static String PARAGRAPH_CONFIG_TITLE = "title"; @@ -422,16 +427,21 @@ public String getGroup() { return group; } - private String getInterpreterGroupId(String user, String noteId) { + private String getInterpreterGroupId(ExecutionContext executionContext) { + if (executionContext.isInCronMode()) { + return "cron-" + name + "-" + executionContext.getNoteId() + "-" + + DATE_TIME_FORMATTER.format(LocalDateTime.now()); + } + List keys = new ArrayList<>(); if (option.isExistingProcess) { keys.add(Constants.EXISTING_PROCESS); } else if (getOption().isIsolated()) { if (option.perUserIsolated()) { - keys.add(user); + keys.add(executionContext.getUser()); } if (option.perNoteIsolated()) { - keys.add(noteId); + keys.add(executionContext.getNoteId()); } } else { keys.add(SHARED_PROCESS); @@ -441,16 +451,16 @@ private String getInterpreterGroupId(String user, String noteId) { return id + "-" + StringUtils.join(keys, "-"); } - private String getInterpreterSessionId(String user, String noteId) { + private String getInterpreterSessionId(ExecutionContext executionContext) { String key; if (option.isExistingProcess()) { key = Constants.EXISTING_PROCESS; } else if (option.perNoteScoped() && option.perUserScoped()) { - key = user + ":" + noteId; + key = executionContext.getUser() + ":" + executionContext.getNoteId(); } else if (option.perUserScoped()) { - key = user; + key = executionContext.getUser(); } else if (option.perNoteScoped()) { - key = noteId; + key = executionContext.getNoteId(); } else { key = SHARED_SESSION; } @@ -459,12 +469,16 @@ private String getInterpreterSessionId(String user, String noteId) { } public ManagedInterpreterGroup getOrCreateInterpreterGroup(String user, String noteId) { - String groupId = getInterpreterGroupId(user, noteId); + return getOrCreateInterpreterGroup(new ExecutionContext(user, noteId)); + } + + public ManagedInterpreterGroup getOrCreateInterpreterGroup(ExecutionContext executionContext) { + String groupId = getInterpreterGroupId(executionContext); try { interpreterGroupWriteLock.lock(); if (!interpreterGroups.containsKey(groupId)) { - LOGGER.info("Create InterpreterGroup with groupId: {} for user: {} and note: {}", - groupId, user, noteId); + LOGGER.info("Create InterpreterGroup with groupId: {} for {}", + groupId, executionContext); ManagedInterpreterGroup intpGroup = createInterpreterGroup(groupId); interpreterGroups.put(groupId, intpGroup); } @@ -484,7 +498,11 @@ void removeInterpreterGroup(String groupId) { } public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) { - String groupId = getInterpreterGroupId(user, noteId); + return getInterpreterGroup(new ExecutionContext(user, noteId)); + } + + public ManagedInterpreterGroup getInterpreterGroup(ExecutionContext executionContext) { + String groupId = getInterpreterGroupId(executionContext); try { interpreterGroupReadLock.lock(); return interpreterGroups.get(groupId); @@ -518,10 +536,14 @@ Map getEditorFromSettingByClassName(String className) { return DEFAULT_EDITOR; } - void closeInterpreters(String user, String noteId) { - ManagedInterpreterGroup interpreterGroup = getInterpreterGroup(user, noteId); + public void closeInterpreters(String user, String noteId) { + closeInterpreters(new ExecutionContext(user, noteId)); + } + + public void closeInterpreters(ExecutionContext executionContext) { + ManagedInterpreterGroup interpreterGroup = getInterpreterGroup(executionContext); if (interpreterGroup != null) { - String sessionId = getInterpreterSessionId(user, noteId); + String sessionId = getInterpreterSessionId(executionContext); interpreterGroup.close(sessionId); } } @@ -850,26 +872,36 @@ synchronized RemoteInterpreterProcess createInterpreterProcess(String interprete } List getOrCreateSession(String user, String noteId) { - ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId); - Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " + - "noteId {}", user, noteId); - String sessionId = getInterpreterSessionId(user, noteId); - return interpreterGroup.getOrCreateSession(user, sessionId); + return getOrCreateSession(new ExecutionContext(user, noteId)); + } + + List getOrCreateSession(ExecutionContext executionContext) { + ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(executionContext); + Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for {}", executionContext); + String sessionId = getInterpreterSessionId(executionContext); + return interpreterGroup.getOrCreateSession(executionContext.getUser(), sessionId); } public Interpreter getDefaultInterpreter(String user, String noteId) { - return getOrCreateSession(user, noteId).get(0); + return getOrCreateSession(new ExecutionContext(user, noteId)).get(0); + } + + public Interpreter getDefaultInterpreter(ExecutionContext executionContext) { + return getOrCreateSession(executionContext).get(0); } public Interpreter getInterpreter(String user, String noteId, String replName) { - Preconditions.checkNotNull(noteId, "noteId should be not null"); + return getInterpreter(new ExecutionContext(user, noteId), replName); + } + + public Interpreter getInterpreter(ExecutionContext executionContext, String replName) { Preconditions.checkNotNull(replName, "replName should be not null"); String className = getInterpreterClassFromInterpreterSetting(replName); if (className == null) { return null; } - List interpreters = getOrCreateSession(user, noteId); + List interpreters = getOrCreateSession(executionContext); for (Interpreter interpreter : interpreters) { if (className.equals(interpreter.getClassName())) { return interpreter; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 2a64c9d1e87..87641a1e533 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -383,7 +383,7 @@ public ApplicationEventListener getAppEventListener() { } private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, - String interpreterJson, boolean override) throws IOException { + String interpreterJson, boolean override) throws IOException { URL[] urls = recursiveBuildLibList(new File(interpreterDir)); ClassLoader tempClassLoader = new URLClassLoader(urls, null); @@ -884,7 +884,12 @@ public void setPropertyAndRestart( } // restart in note page - public void restart(String settingId, String noteId, String user) throws InterpreterException { + public void restart(String settingId, String user, String noteId) throws InterpreterException { + restart(settingId, new ExecutionContext(user, noteId)); + } + + // restart in note page + public void restart(String settingId, ExecutionContext executionContext) throws InterpreterException { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); intpSetting = interpreterSettings.get(settingId); @@ -892,7 +897,7 @@ public void restart(String settingId, String noteId, String user) throws Interpr // If it did, overwrite old dependency jar with new one if (intpSetting != null) { copyDependenciesFromLocalPath(intpSetting); - intpSetting.closeInterpreters(user, noteId); + intpSetting.closeInterpreters(executionContext); } else { throw new InterpreterException("Interpreter setting id " + settingId + " not found"); } @@ -1033,7 +1038,8 @@ public Set getRunningInterpreters() { public void onNoteRemove(Note note, AuthenticationInfo subject) throws IOException { // remove from all interpreter instance's angular object registry for (InterpreterSetting settings : interpreterSettings.values()) { - InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), note.getId()); + InterpreterGroup interpreterGroup = settings.getInterpreterGroup( + new ExecutionContext(subject.getUser(), note.getId())); if (interpreterGroup != null) { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 5b388951abb..0af557c789d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -244,7 +245,7 @@ public void setVersion(String version) { } public String getDefaultInterpreterGroup() { - if (defaultInterpreterGroup == null) { + if (StringUtils.isBlank(defaultInterpreterGroup)) { defaultInterpreterGroup = ZeppelinConfiguration.create() .getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT); } @@ -306,7 +307,7 @@ void setParagraphJobListener(ParagraphJobListener paragraphJobListener) { this.paragraphJobListener = paragraphJobListener; } - public Boolean isCronSupported(ZeppelinConfiguration config) { + public boolean isCronSupported(ZeppelinConfiguration config) { if (config.isZeppelinNotebookCronEnable()) { config.getZeppelinNotebookCronFolders(); if (StringUtils.isBlank(config.getZeppelinNotebookCronFolders())) { @@ -840,7 +841,7 @@ private void snapshotAngularObjectRegistry(String user) { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getInterpreterGroup(new ExecutionContext(user, id)); if (intpGroup != null) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id)); @@ -857,10 +858,10 @@ private void removeAllAngularObjectInParagraph(String user, String paragraphId) } for (InterpreterSetting setting : settings) { - if (setting.getInterpreterGroup(user, id) == null) { + if (setting.getInterpreterGroup(new ExecutionContext(user, id)) == null) { continue; } - InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getInterpreterGroup(new ExecutionContext(user, id)); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { @@ -1001,6 +1002,15 @@ public void setRunning(boolean runStatus) { } } + public void setCronMode(boolean cronMode) { + info.put("inCronMode", cronMode); + } + + public boolean isCronMode() { + return Boolean.parseBoolean( + info.getOrDefault("inCronMode", "false").toString()); + } + public boolean isRunning() { return (boolean) getInfo().getOrDefault("isRunning", false); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 55eedbf485b..387a56e09a7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -34,6 +34,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -475,7 +476,8 @@ public Note loadNoteFromRepo(String id, AuthenticationInfo subject) { SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); List settings = interpreterSettingManager.get(); for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId()); + InterpreterGroup intpGroup = setting.getInterpreterGroup( + new ExecutionContext(subject.getUser(), note.getId())); if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); String noteId = snapshot.getAngularObject().getNoteId(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 756db7357c0..da4c7c63b28 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -40,6 +40,7 @@ import org.apache.zeppelin.display.Input; import org.apache.zeppelin.helium.HeliumPackage; import org.apache.zeppelin.interpreter.Constants; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -275,8 +276,9 @@ public boolean isEnabled() { } public Interpreter getBindedInterpreter() throws InterpreterNotFoundException { - return this.note.getInterpreterFactory().getInterpreter(user, note.getId(), intpText, - note.getDefaultInterpreterGroup()); + return this.note.getInterpreterFactory().getInterpreter(intpText, + new ExecutionContext(user, note.getId(), + note.getDefaultInterpreterGroup(), note.isCronMode())); } public void setInterpreter(Interpreter interpreter) { @@ -651,8 +653,8 @@ String extractVariablesFromAngularRegistry(String scriptBody, Map public boolean isValidInterpreter(String replName) { try { - return note.getInterpreterFactory().getInterpreter(user, note.getId(), replName, - note.getDefaultInterpreterGroup()) != null; + return note.getInterpreterFactory().getInterpreter(replName, + new ExecutionContext(user, note.getId(), note.getDefaultInterpreterGroup())) != null; } catch (InterpreterNotFoundException e) { return false; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java index 45133f94f80..af47a57c15a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -17,14 +17,10 @@ package org.apache.zeppelin.notebook.scheduler; -import java.io.IOException; -import java.util.Map; - import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.user.AuthenticationInfo; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; @@ -38,77 +34,42 @@ public class CronJob implements org.quartz.Job { @Override public void execute(JobExecutionContext context) { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); - Notebook notebook = (Notebook) jobDataMap.get("notebook"); - String noteId = jobDataMap.getString("noteId"); - LOGGER.info("Start cron job of note: " + noteId); - Note note = null; - try { - note = notebook.getNote(noteId); - if (note == null) { - LOGGER.warn("Skip cron job of note: " + noteId + ", because it is not found"); - return; - } - } catch (IOException e) { - LOGGER.warn("Skip cron job of note: " + noteId + ", because fail to get it", e); - return; - } + Note note = (Note) jobDataMap.get("note"); + LOGGER.info("Start cron job of note: " + note.getId()); if (note.haveRunningOrPendingParagraphs()) { LOGGER.warn( "execution of the cron job is skipped because there is a running or pending " + "paragraph (note id: {})", - noteId); - return; - } - - if (!note.isCronSupported(notebook.getConf())) { - LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + note.getId()); return; } - runAll(note); + try { + note.setCronMode(true); - boolean releaseResource = false; - String cronExecutingUser = null; - Map config = note.getConfig(); - if (config != null) { - if (config.containsKey("releaseresource")) { - releaseResource = (boolean) config.get("releaseresource"); + String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser"); + String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles"); + if (null == cronExecutingUser) { + cronExecutingUser = "anonymous"; + } + AuthenticationInfo authenticationInfo = + new AuthenticationInfo( + cronExecutingUser, + StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles, + null); + try { + note.runAll(authenticationInfo, true); + } catch (Exception e) { + LOGGER.warn("Fail to run note: " + note.getName(), e); } - cronExecutingUser = (String) config.get("cronExecutingUser"); - } - if (releaseResource) { - LOGGER.info("Releasing interpreters used by this note: " + noteId); + LOGGER.info("Releasing interpreters used by this note: " + note.getId()); for (InterpreterSetting setting : note.getUsedInterpreterSettings()) { - try { - notebook - .getInterpreterSettingManager() - .restart( - setting.getId(), - noteId, - cronExecutingUser != null ? cronExecutingUser : "anonymous"); - } catch (InterpreterException e) { - LOGGER.error("Fail to restart interpreter: " + setting.getId(), e); - } + setting.closeInterpreters(new ExecutionContext(cronExecutingUser, note.getId(), + note.getDefaultInterpreterGroup(), true)); } - } - } - - void runAll(Note note) { - String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser"); - String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles"); - if (null == cronExecutingUser) { - cronExecutingUser = "anonymous"; - } - AuthenticationInfo authenticationInfo = - new AuthenticationInfo( - cronExecutingUser, - StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles, - null); - try { - note.runAll(authenticationInfo, true); - } catch (Exception e) { - LOGGER.warn("Fail to run note: " + note.getName(), e); + } finally { + note.setCronMode(false); } } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java index 5139e35002a..f2874b3649b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java @@ -126,7 +126,7 @@ public boolean refreshCron(String noteId) { return false; } - if (!note.isCronSupported(zeppelinConfiguration).booleanValue()) { + if (!note.isCronSupported(zeppelinConfiguration)) { LOGGER.warn("Skip refresh cron of note {} because its cron is not enabled.", noteId); return false; } @@ -138,9 +138,7 @@ public boolean refreshCron(String noteId) { } JobDataMap jobDataMap = new JobDataMap(); - jobDataMap.put("noteId", noteId); - jobDataMap.put("notebook", notebook); - + jobDataMap.put("note", note); JobDetail newJob = JobBuilder.newJob(CronJob.class) .withIdentity(noteId, "note") diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java index d39b0ecd882..ab32d101e27 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java @@ -27,10 +27,12 @@ public class ConfInterpreterTest extends AbstractInterpreterTest { + private ExecutionContext executionContext = new ExecutionContext("user1", "note1", "test"); + @Test - public void testCorrectConf() throws IOException, InterpreterException { - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf", "test") instanceof ConfInterpreter); - ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf", "test"); + public void testCorrectConf() throws InterpreterException { + assertTrue(interpreterFactory.getInterpreter("test.conf", executionContext) instanceof ConfInterpreter); + ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("test.conf", executionContext); InterpreterContext context = InterpreterContext.builder() .setNoteId("noteId") @@ -40,8 +42,8 @@ public void testCorrectConf() throws IOException, InterpreterException { InterpreterResult result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code); - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test", "test") instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test", "test"); + assertTrue(interpreterFactory.getInterpreter("test", executionContext) instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test", executionContext); remoteInterpreter.interpret("hello world", context); assertEquals(7, remoteInterpreter.getProperties().size()); assertEquals("new_value", remoteInterpreter.getProperty("property_1")); @@ -58,9 +60,9 @@ public void testCorrectConf() throws IOException, InterpreterException { } @Test - public void testEmptyConf() throws IOException, InterpreterException { - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf", "test") instanceof ConfInterpreter); - ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf", "test"); + public void testEmptyConf() throws InterpreterException { + assertTrue(interpreterFactory.getInterpreter("test.conf", executionContext) instanceof ConfInterpreter); + ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("test.conf", executionContext); InterpreterContext context = InterpreterContext.builder() .setNoteId("noteId") @@ -69,8 +71,8 @@ public void testEmptyConf() throws IOException, InterpreterException { InterpreterResult result = confInterpreter.interpret("", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code); - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test", "test") instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test", "test"); + assertTrue(interpreterFactory.getInterpreter("test", executionContext) instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test", executionContext); assertEquals(6, remoteInterpreter.getProperties().size()); assertEquals("value_1", remoteInterpreter.getProperty("property_1")); assertEquals("value_3", remoteInterpreter.getProperty("property_3")); @@ -78,16 +80,16 @@ public void testEmptyConf() throws IOException, InterpreterException { @Test - public void testRunningAfterOtherInterpreter() throws IOException, InterpreterException { - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf", "test") instanceof ConfInterpreter); - ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf", "test"); + public void testRunningAfterOtherInterpreter() throws InterpreterException { + assertTrue(interpreterFactory.getInterpreter("test.conf", executionContext) instanceof ConfInterpreter); + ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("test.conf", executionContext); InterpreterContext context = InterpreterContext.builder() .setNoteId("noteId") .setParagraphId("paragraphId") .build(); - RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test", "test"); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test", executionContext); InterpreterResult result = remoteInterpreter.interpret("hello world", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 6d4666a6a91..87d65cdbd5e 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -29,32 +29,32 @@ public class InterpreterFactoryTest extends AbstractInterpreterTest { @Test public void testGetFactory() throws InterpreterException { - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "", "test") instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "", "test"); + assertTrue(interpreterFactory.getInterpreter("", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("", new ExecutionContext("user1", "note1", "test")); // EchoInterpreter is the default interpreter because test is the default interpreter group assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName()); - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "double_echo", "test") instanceof RemoteInterpreter); - remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "double_echo", "test"); + assertTrue(interpreterFactory.getInterpreter("double_echo", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("double_echo", new ExecutionContext("user1", "note1", "test")); assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName()); - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test", "test") instanceof RemoteInterpreter); - remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test", "test"); + assertTrue(interpreterFactory.getInterpreter("test", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test", new ExecutionContext("user1", "note1", "test")); assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName()); - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test2", "test") instanceof RemoteInterpreter); - remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test2", "test"); + assertTrue(interpreterFactory.getInterpreter("test2", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test2", new ExecutionContext("user1", "note1", "test")); assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName()); - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test2.double_echo", "test") instanceof RemoteInterpreter); - remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test2.double_echo", "test"); + assertTrue(interpreterFactory.getInterpreter("test2.double_echo", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test2.double_echo", new ExecutionContext("user1", "note1", "test")); assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName()); } @Test public void testUnknownRepl1() { try { - interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl", "test"); + interpreterFactory.getInterpreter("test.unknown_repl", new ExecutionContext("user1", "note1", "test")); fail("should fail due to no such interpreter"); } catch (InterpreterNotFoundException e) { assertEquals("No such interpreter: test.unknown_repl", e.getMessage()); @@ -64,7 +64,7 @@ public void testUnknownRepl1() { @Test public void testUnknownRepl2() { try { - interpreterFactory.getInterpreter("user1", "note1", "unknown_repl", "test"); + interpreterFactory.getInterpreter("unknown_repl", new ExecutionContext("user1", "note1", "test")); fail("should fail due to no such interpreter"); } catch (InterpreterNotFoundException e) { assertEquals("No such interpreter: unknown_repl", e.getMessage()); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java index 6d183675d41..61eefae29de 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java @@ -176,7 +176,7 @@ public void testCreateUpdateRemoveSetting() throws IOException, InterpreterExcep InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1"); assertEquals(3, interpreterGroup.getSessionNum()); // only close user1's session - interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + interpreterSettingManager.restart(interpreterSetting.getId(), "user1", "note1"); assertEquals(2, interpreterGroup.getSessionNum()); // remove interpreter setting @@ -192,13 +192,13 @@ public void testCreateUpdateRemoveSetting() throws IOException, InterpreterExcep //@Test public void testGetEditor() throws IOException, InterpreterNotFoundException { - Interpreter echoInterpreter = interpreterFactory.getInterpreter("user1", "note1", "test.echo", "test"); + Interpreter echoInterpreter = interpreterFactory.getInterpreter("test.echo", new ExecutionContext("user1", "note1", "test")); // get editor setting from interpreter-setting.json Map editor = interpreterSettingManager.getEditorSetting("test.echo", "note1"); assertEquals("java", editor.get("language")); // when editor setting doesn't exit, return the default editor - Interpreter mock1Interpreter = interpreterFactory.getInterpreter("user1", "note1", "mock1", "test"); + Interpreter mock1Interpreter = interpreterFactory.getInterpreter("mock1", new ExecutionContext("user1", "note1", "test")); editor = interpreterSettingManager.getEditorSetting("mock1", "note1"); assertEquals("text", editor.get("language")); } @@ -227,7 +227,7 @@ public void testRestartPerUserIsolated() throws InterpreterException { interpreterSetting.getOrCreateSession("user2", "note2"); assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); - interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + interpreterSettingManager.restart(interpreterSetting.getId(), "user1", "note1"); assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); } @@ -241,7 +241,7 @@ public void testRestartPerNoteIsolated() throws InterpreterException { interpreterSetting.getOrCreateSession("user2", "note2"); assertEquals(2, interpreterSetting.getAllInterpreterGroups().size()); - interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + interpreterSettingManager.restart(interpreterSetting.getId(), "user1", "note1"); assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); } @@ -256,7 +256,7 @@ public void testRestartPerUserScoped() throws InterpreterException { assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); - interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + interpreterSettingManager.restart(interpreterSetting.getId(), "user1", "note1"); assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); } @@ -272,7 +272,7 @@ public void testRestartPerNoteScoped() throws InterpreterException { assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); assertEquals(2, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); - interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + interpreterSettingManager.restart(interpreterSetting.getId(), "user1", "note1"); assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); assertEquals(1, interpreterSetting.getAllInterpreterGroups().get(0).getSessionNum()); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java index db080165a1c..3630d72ef6f 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java @@ -19,6 +19,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.AbstractInterpreterTest; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -46,8 +47,8 @@ public void setUp() throws Exception { @Test public void testTimeout_1() throws InterpreterException, InterruptedException, IOException { - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.echo", "test") instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.echo", "test"); + assertTrue(interpreterFactory.getInterpreter("test.echo", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test.echo", new ExecutionContext("user1", "note1", "test")); assertFalse(remoteInterpreter.isOpened()); InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test"); assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); @@ -70,8 +71,8 @@ public void testTimeout_1() throws InterpreterException, InterruptedException, I @Test public void testTimeout_2() throws InterpreterException, InterruptedException, IOException { - assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.sleep", "test") instanceof RemoteInterpreter); - final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.sleep", "test"); + assertTrue(interpreterFactory.getInterpreter("test.sleep", new ExecutionContext("user1", "note1", "test")) instanceof RemoteInterpreter); + final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("test.sleep", new ExecutionContext("user1", "note1", "test")); // simulate how zeppelin submit paragraph remoteInterpreter.getScheduler().submit(new Job("test-job", null) { diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java index 567823a9b8e..6118545be07 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java @@ -83,7 +83,7 @@ public void testMultipleInterpreterProcess() throws InterpreterException, IOExce assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size()); - interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1"); + interpreterSettingManager.restart(interpreterSetting.getId(), "user1", "note1"); assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size()); interpreterSetting.close(); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java index 272738944d6..5ef6d1f4b94 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteTest.java @@ -86,7 +86,7 @@ public void setUp() { @Test public void runNormalTest() throws InterpreterNotFoundException { - when(interpreterFactory.getInterpreter(anyString(), anyString(), eq("spark"), anyString())).thenReturn(interpreter); + when(interpreterFactory.getInterpreter(eq("spark"), any())).thenReturn(interpreter); when(interpreter.getScheduler()).thenReturn(scheduler); String pText = "%spark sc.version"; @@ -99,7 +99,7 @@ public void runNormalTest() throws InterpreterNotFoundException { ArgumentCaptor pCaptor = ArgumentCaptor.forClass(Paragraph.class); verify(scheduler, only()).submit(pCaptor.capture()); - verify(interpreterFactory, times(1)).getInterpreter(anyString(), anyString(), eq("spark"), anyString()); + verify(interpreterFactory, times(1)).getInterpreter(eq("spark"), any()); assertEquals("Paragraph text", pText, pCaptor.getValue().getText()); } @@ -113,7 +113,7 @@ public void addParagraphWithEmptyReplNameTest() { @Test public void addParagraphWithLastReplNameTest() throws InterpreterNotFoundException { - when(interpreterFactory.getInterpreter(anyString(), anyString(), eq("spark"), anyString())).thenReturn(interpreter); + when(interpreterFactory.getInterpreter(eq("spark"), any())).thenReturn(interpreter); Note note = new Note("test", "", interpreterFactory, interpreterSettingManager, paragraphJobListener, credentials, noteEventListener); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("%spark "); @@ -124,7 +124,7 @@ public void addParagraphWithLastReplNameTest() throws InterpreterNotFoundExcepti @Test public void insertParagraphWithLastReplNameTest() throws InterpreterNotFoundException { - when(interpreterFactory.getInterpreter(anyString(), anyString(), eq("spark"), anyString())).thenReturn(interpreter); + when(interpreterFactory.getInterpreter(eq("spark"), any())).thenReturn(interpreter); Note note = new Note("test", "", interpreterFactory, interpreterSettingManager, paragraphJobListener, credentials, noteEventListener); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("%spark "); @@ -135,7 +135,7 @@ public void insertParagraphWithLastReplNameTest() throws InterpreterNotFoundExce @Test public void insertParagraphWithInvalidReplNameTest() throws InterpreterNotFoundException { - when(interpreterFactory.getInterpreter(anyString(), anyString(), eq("invalid"), anyString())).thenReturn(null); + when(interpreterFactory.getInterpreter(eq("invalid"), any())).thenReturn(null); Note note = new Note("test", "", interpreterFactory, interpreterSettingManager, paragraphJobListener, credentials, noteEventListener); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("%invalid "); @@ -153,7 +153,7 @@ public void insertParagraphwithUser() { @Test public void clearAllParagraphOutputTest() throws InterpreterNotFoundException { - when(interpreterFactory.getInterpreter(anyString(), anyString(), eq("md"), anyString())).thenReturn(interpreter); + when(interpreterFactory.getInterpreter(eq("md"), any())).thenReturn(interpreter); when(interpreter.getScheduler()).thenReturn(scheduler); Note note = new Note("test", "", interpreterFactory, interpreterSettingManager, paragraphJobListener, credentials, noteEventListener); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 5a4752b99c9..029b3ab7519 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -22,6 +22,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.AbstractInterpreterTest; +import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterNotFoundException; @@ -694,9 +695,9 @@ public void testAutoRestartInterpreterAfterSchedule() throws InterruptedExceptio schedulerService.refreshCron(note.getId()); - RemoteInterpreter mock1 = (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), "mock1", "test"); + RemoteInterpreter mock1 = (RemoteInterpreter) interpreterFactory.getInterpreter("mock1", new ExecutionContext(anonymous.getUser(), note.getId(), "test")); - RemoteInterpreter mock2 = (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), "mock2", "test"); + RemoteInterpreter mock2 = (RemoteInterpreter) interpreterFactory.getInterpreter("mock2", new ExecutionContext(anonymous.getUser(), note.getId(), "test")); // wait until interpreters are started while (!mock1.isOpened() || !mock2.isOpened()) { @@ -733,8 +734,7 @@ public void testCronWithReleaseResourceClosesOnlySpecificInterpreters() } }); RemoteInterpreter cronNoteInterpreter = - (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), - cronNote.getId(), "mock1", "test"); + (RemoteInterpreter) interpreterFactory.getInterpreter("mock1", new ExecutionContext(anonymous.getUser(), cronNote.getId(), "test")); // create a paragraph of the cron scheduled note. Paragraph cronNoteParagraph = cronNote.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -749,8 +749,7 @@ public void testCronWithReleaseResourceClosesOnlySpecificInterpreters() Note anotherNote = notebook.createNote("note1", anonymous); RemoteInterpreter anotherNoteInterpreter = - (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), - anotherNote.getId(), "mock2", "test"); + (RemoteInterpreter) interpreterFactory.getInterpreter("mock2", new ExecutionContext(anonymous.getUser(), anotherNote.getId(), "test")); // create a paragraph of another note Paragraph anotherNoteParagraph = anotherNote.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -1405,7 +1404,7 @@ public void testGetAllNotes() throws Exception { notebook.removeNote(note1.getId(), AuthenticationInfo.ANONYMOUS); notebook.removeNote(note2.getId(), AuthenticationInfo.ANONYMOUS); } - + @Test public void testCreateDuplicateNote() throws Exception { Note note1 = notebook.createNote("note1", anonymous);