From d14e8bd821509389666cd22ff7888a30c634e48a Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sun, 16 Oct 2016 15:06:17 +0800 Subject: [PATCH 1/3] ZEPPELIN-1577. LivyInterpreter should not use FIFOScheduler --- .../zeppelin/livy/LivyPySparkInterpreter.java | 4 +- .../zeppelin/livy/LivySparkInterpreter.java | 8 +- .../zeppelin/livy/LivySparkRInterpreter.java | 4 +- .../livy/LivySparkSQLInterpreter.java | 2 +- .../interpreter/remote/RemoteInterpreter.java | 2 +- .../remote/RemoteInterpreterServer.java | 5 + .../zeppelin/scheduler/ExecutorFactory.java | 13 +- .../scheduler/FIFOPerUserScheduler.java | 118 ++++++++++++++++++ .../zeppelin/scheduler/FIFOScheduler.java | 2 +- .../org/apache/zeppelin/scheduler/Job.java | 8 +- .../zeppelin/scheduler/ParallelScheduler.java | 2 +- .../zeppelin/scheduler/RemoteScheduler.java | 8 +- .../zeppelin/scheduler/SchedulerFactory.java | 11 ++ .../apache/zeppelin/notebook/Paragraph.java | 12 +- 14 files changed, 174 insertions(+), 25 deletions(-) create mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOPerUserScheduler.java diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java index bd342a2e3ed..75aa39ba0ac 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java @@ -101,8 +101,8 @@ public int getProgress(InterpreterContext context) { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - LivyPySparkInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetFIFOPerUserScheduler( + LivySparkInterpreter.class.getName()); } @Override diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 9a9dd8080ee..d21d2f2e5af 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; /** * Livy Spark interpreter for Zeppelin. @@ -37,7 +38,7 @@ public class LivySparkInterpreter extends Interpreter { Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class); private LivyOutputStream out; - protected static Map userSessionMap; + protected static Map userSessionMap = new ConcurrentHashMap<>(); protected static Map sessionId2AppIdMap; protected static Map sessionId2WebUIMap; @@ -46,7 +47,6 @@ public class LivySparkInterpreter extends Interpreter { public LivySparkInterpreter(Properties property) { super(property); - userSessionMap = new HashMap<>(); sessionId2AppIdMap = new HashMap<>(); sessionId2WebUIMap = new HashMap<>(); livyHelper = new LivyHelper(property); @@ -151,8 +151,8 @@ public int getProgress(InterpreterContext context) { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - LivySparkInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetFIFOPerUserScheduler( + LivySparkInterpreter.class.getName()); } @Override diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java index 753b378e9a1..ff0732c8b26 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java @@ -101,8 +101,8 @@ public int getProgress(InterpreterContext context) { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - LivySparkRInterpreter.class.getName() + this.hashCode()); + return SchedulerFactory.singleton().createOrGetFIFOPerUserScheduler( + LivySparkInterpreter.class.getName()); } @Override diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 3d4a0f4e428..c32da885183 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -146,7 +146,7 @@ public Scheduler getScheduler() { if (concurrentSQL()) { int maxConcurrency = 10; return SchedulerFactory.singleton().createOrGetParallelScheduler( - LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency); + LivySparkSQLInterpreter.class.getName() + this.hashCode(), maxConcurrency); } else { Interpreter intp = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName()); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index e0cdaa338b1..33fafea37e3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -296,7 +296,7 @@ public void close() { @Override public InterpreterResult interpret(String st, InterpreterContext context) { if (logger.isDebugEnabled()) { - logger.debug("st:\n{}", st); + logger.debug("st:\n{},\nuser:{}", st, context.getAuthenticationInfo().getUser()); } FormType form = getFormType(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 0a7b1ed6912..57010c6f434 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -427,6 +427,11 @@ public void onPostExecute(String script) { hookListener.onPostExecute(script); } + @Override + public String getUser() { + return context.getAuthenticationInfo().getUser(); + } + @Override protected Object jobRun() throws Throwable { try { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java index 31b534e11f7..95eb90060a3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; /** * @@ -53,10 +54,18 @@ public ExecutorService createOrGet(String name) { return createOrGet(name, 100); } - public ExecutorService createOrGet(String name, int numThread) { + public ExecutorService createOrGet(final String name, int numThread) { synchronized (executor) { if (!executor.containsKey(name)) { - executor.put(name, Executors.newScheduledThreadPool(numThread)); + executor.put(name, Executors.newScheduledThreadPool(numThread, new ThreadFactory(){ + int i = 0; + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName(name + "-" + i++); + return thread; + } + })); } return executor.get(name); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOPerUserScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOPerUserScheduler.java new file mode 100644 index 00000000000..fcdefd45b86 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOPerUserScheduler.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ExecutorService; + +/** + * Each user use one FIFOScheduler + */ +public class FIFOPerUserScheduler implements Scheduler { + + private static Logger LOGGER = LoggerFactory.getLogger(FIFOScheduler.class); + + private ExecutorService executor; + private SchedulerListener listener; + boolean terminate = false; + private String name; + private int maxUser = 10; + + private Map schedulerMap = new HashMap<>(); + + public FIFOPerUserScheduler(String name, ExecutorService executor, SchedulerListener listener) { + this.name = name; + this.executor = executor; + this.listener = listener; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public Collection getJobsWaiting() { + synchronized (schedulerMap) { + List waitingJobs = new ArrayList(); + for (FIFOScheduler scheduler : schedulerMap.values()) { + waitingJobs.addAll(scheduler.getJobsWaiting()); + } + return waitingJobs; + } + } + + @Override + public Collection getJobsRunning() { +// return new ArrayList<>(); + synchronized (schedulerMap) { + List runningJobs = new ArrayList(); + for (FIFOScheduler scheduler : schedulerMap.values()) { + runningJobs.addAll(scheduler.getJobsRunning()); + } + return runningJobs; + } + } + + @Override + public void submit(Job job) { + synchronized (schedulerMap) { + FIFOScheduler scheduler = schedulerMap.get(job.getUser()); + if (scheduler == null) { + scheduler = (FIFOScheduler) SchedulerFactory.singleton() + .createOrGetFIFOScheduler(this.name + "_" + job.getUser()); + executor.execute(scheduler); + } + LOGGER.debug("Submitting job for owner:" + job.getUser()); + scheduler.submit(job); + schedulerMap.put(job.getUser(), scheduler); + } + } + + @Override + public Job removeFromWaitingQueue(String jobId) { + synchronized (schedulerMap) { + for (FIFOScheduler scheduler : schedulerMap.values()) { + Job job = scheduler.removeFromWaitingQueue(jobId); + if (job != null) { + schedulerMap.notify(); + return job; + } + } + return null; + } + } + + @Override + public void stop() { + synchronized (schedulerMap) { + for (FIFOScheduler scheduler : schedulerMap.values()) { + scheduler.stop(); + } + } + } + + @Override + public void run() { + // Do nothing, all the work is delegated to FIFOScheduler in schedulerMap + } +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index 1837e4db5a6..cd700545842 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -35,7 +35,7 @@ public class FIFOScheduler implements Scheduler { private ExecutorService executor; private SchedulerListener listener; boolean terminate = false; - Job runningJob = null; + volatile Job runningJob = null; private String name; static Logger LOGGER = LoggerFactory.getLogger(FIFOScheduler.class); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 2dc1719ebac..8f3eb432f76 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -107,14 +107,14 @@ public Job(String jobId, String jobName, JobListener listener) { this(jobId, jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC); } - public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) { + public Job(String jobId, String jobName, JobListener listener, + long progressUpdateIntervalMs) { this.jobName = jobName; this.listener = listener; this.progressUpdateIntervalMs = progressUpdateIntervalMs; dateCreated = new Date(); id = jobId; - setStatus(Status.READY); } @@ -122,6 +122,10 @@ public String getId() { return id; } + public String getUser() { + return null; + } + @Override public int hashCode() { return id.hashCode(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java index 1873944bb83..1384ae3a993 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -111,7 +111,7 @@ public void run() { try { queue.wait(500); } catch (InterruptedException e) { - LOGGER.error("Exception in MockInterpreterAngular while interpret queue.wait", e); + LOGGER.error("Exception in ParallelScheduler while interpret queue.wait", e); } continue; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 28c743740ed..d30a7a4c3b3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -32,12 +32,13 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; /** * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter */ public class RemoteScheduler implements Scheduler { - Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); + private static Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); List queue = new LinkedList(); List running = new LinkedList(); @@ -89,6 +90,7 @@ public void run() { synchronized (queue) { try { queue.wait(500); + logger.debug("wait to be submitted to remote, job:" + job.getUser()); } catch (InterruptedException e) { logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " + "queue.wait", e); @@ -265,6 +267,7 @@ public synchronized Job.Status getStatus() { // not found this job in the remote schedulers. // maybe not submitted, maybe already finished //Status status = getLastStatus(); + logger.warn("job {} is not found in remote interpreter", job.getId()); listener.afterStatusChange(job, null, null); return job.getStatus(); } @@ -291,7 +294,7 @@ private class JobRunner implements Runnable, JobListener { private Scheduler scheduler; private Job job; private boolean jobExecuted; - boolean jobSubmittedRemotely; + private volatile boolean jobSubmittedRemotely; public JobRunner(Scheduler scheduler, Job job) { this.scheduler = scheduler; @@ -327,7 +330,6 @@ public void run() { listener.jobStarted(scheduler, job); } job.run(); - jobExecuted = true; jobSubmittedRemotely = true; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 20b4b8aa916..476aa0b17e4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -84,6 +84,17 @@ public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { } } + public Scheduler createOrGetFIFOPerUserScheduler(String name) { + synchronized (schedulers) { + if (schedulers.containsKey(name) == false) { + Scheduler s = new FIFOPerUserScheduler(name, executor, this); + schedulers.put(name, s); + executor.execute(s); + } + return schedulers.get(name); + } + } + public Scheduler createOrGetRemoteScheduler( String name, String noteId, diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index ad580ed9504..bb9ddb5ca2c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -69,7 +69,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { @VisibleForTesting Paragraph() { - super(generateId(), null); + super(generateId(), null, null); config = new HashMap<>(); settings = new GUI(); } @@ -105,10 +105,6 @@ private static String generateId() { + new Random(System.currentTimeMillis()).nextInt(); } - public String getUser() { - return user; - } - public String getText() { return text; } @@ -127,6 +123,11 @@ public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { this.user = authenticationInfo.getUser(); } + @Override + public String getUser() { + return user; + } + public String getTitle() { return title; } @@ -330,7 +331,6 @@ protected Object jobRun() throws Throwable { InterpreterContext context = getInterpreterContext(); InterpreterContext.set(context); InterpreterResult ret = repl.interpret(script, context); - if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { return getReturn(); } From a9e273ff0f5b7a285749a616046e7d5fb8603523 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 27 Oct 2016 16:40:08 +0800 Subject: [PATCH 2/3] add test --- .../livy/LivySparkSQLInterpreter.java | 6 ++- ...rationTest.java => LivyInterpreterIT.java} | 54 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) rename livy/src/test/java/org/apache/zeppelin/livy/{LivyIntegrationTest.java => LivyInterpreterIT.java} (82%) diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index c32da885183..4442213307a 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -34,7 +34,7 @@ */ public class LivySparkSQLInterpreter extends Interpreter { - Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class); protected Map userSessionMap; private LivyHelper livyHelper; @@ -57,6 +57,10 @@ public void close() { @Override public InterpreterResult interpret(String line, InterpreterContext interpreterContext) { try { + LOGGER.info("***********sessionMap size:" + userSessionMap.size()); + for (Map.Entry entry : userSessionMap.entrySet()) { + LOGGER.info("Session {}, user:{}", entry.getValue(), entry.getKey()); + } if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) { try { userSessionMap.put( diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyIntegrationTest.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java similarity index 82% rename from livy/src/test/java/org/apache/zeppelin/livy/LivyIntegrationTest.java rename to livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index 6df37007b35..19115ef1d66 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyIntegrationTest.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -35,9 +35,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class LivyIntegrationTest { +public class LivyInterpreterIT { - private static Logger LOGGER = LoggerFactory.getLogger(LivyIntegrationTest.class); + private static Logger LOGGER = LoggerFactory.getLogger(LivyInterpreterIT.class); private static Cluster cluster; private static Properties properties; @@ -210,6 +210,56 @@ public void testSparkRInterpreter() { // TODO (zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release. } + @Test + public void testScheduler() throws InterruptedException { + if (!checkPreCondition()) { + return; + } + + final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + AuthenticationInfo authInfo1 = new AuthenticationInfo("user1"); + final InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "title", + "text", authInfo1, null, null, null, null, null, output); + AuthenticationInfo authInfo2 = new AuthenticationInfo("user2"); + final InterpreterContext context2 = new InterpreterContext("noteId", "paragraphId", "title", + "text", authInfo2, null, null, null, null, null, output); + sparkInterpreter.open(); + // initialize session first + sparkInterpreter.interpret("sc.version", context1); + sparkInterpreter.interpret("sc.version", context2); + + long startTime = System.currentTimeMillis(); + Thread thread1 = new Thread() { + @Override + public void run() { + InterpreterResult result = sparkInterpreter.interpret("Thread.sleep(1000*10)", context1); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + } + }; + Thread thread2 = new Thread() { + @Override + public void run() { + InterpreterResult result = sparkInterpreter.interpret("Thread.sleep(1000*10)", context2); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + } + }; + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + long endTime = System.currentTimeMillis(); + long timeCost = endTime - startTime; + assertTrue("jobs in 2 sessions are not scheduled parallelly", timeCost < 20*1000); + + + // close sessions + sparkInterpreter.close(); + } + public static class MyInterpreterOutputListener implements InterpreterOutputListener { private StringBuilder outputAppended = new StringBuilder(); private StringBuilder outputUpdated = new StringBuilder(); From 86d937e2da2dbbabcaca675b64e1984e5dba98d3 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 31 Oct 2016 17:01:12 +0800 Subject: [PATCH 3/3] fix unit test --- .../org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 740ef4066c4..dc1d520eab1 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -341,6 +341,10 @@ public void zRunTest() throws IOException { waitForFinish(p0); assertEquals(Status.FINISHED, p0.getStatus()); + note.run(p1.getId()); + waitForFinish(p1); + assertEquals(Status.FINISHED, p1.getStatus()); + note.run(p2.getId()); waitForFinish(p2); assertEquals(Status.FINISHED, p2.getStatus());