Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c91f498
prepend interpreteroutputstream to interpreter result
Leemoonsoo Dec 22, 2015
1f419b6
Add InterpreterOutput
Leemoonsoo Dec 22, 2015
a07d7db
Implement InterpreterResult.toString
Leemoonsoo Dec 27, 2015
0f60b54
Update test
Leemoonsoo Dec 27, 2015
fb5e7b5
Update test
Leemoonsoo Dec 27, 2015
a42e4ff
Update test
Leemoonsoo Jan 3, 2016
89d9798
Render text output line by line
Leemoonsoo Jan 4, 2016
6f607f7
Add newline listener
Leemoonsoo Jan 4, 2016
258ff38
Barely working
Leemoonsoo Jan 5, 2016
786c978
update paragraph object after witing to outputstream
Leemoonsoo Jan 5, 2016
2060c1e
Clear before render text
Leemoonsoo Jan 6, 2016
e7a9b37
Delayed persist
Leemoonsoo Jan 6, 2016
8a1223f
Handle update output correctly
Leemoonsoo Jan 7, 2016
c01df62
Connect Spark interpreter Console.out to outputstream
Leemoonsoo Jan 7, 2016
479b836
Add test
Leemoonsoo Jan 7, 2016
e278e84
Clear output correctly
Leemoonsoo Jan 7, 2016
37d6920
Handle display system directive correctly
Leemoonsoo Jan 8, 2016
846015b
Update scalding
Leemoonsoo Jan 8, 2016
626ad48
Add license header
Leemoonsoo Jan 8, 2016
b68180e
Add InterpreterOutput on spark interpreter unitest
Leemoonsoo Jan 8, 2016
6d9cc51
Pass InterpreterOutput to SparkILoop
Leemoonsoo Jan 8, 2016
bc6262e
Make PysparkInterpreter stream output
Leemoonsoo Jan 8, 2016
07b3e1a
Handle clear output correctly
Leemoonsoo Jan 11, 2016
d29cfbf
workaround jshint
Leemoonsoo Jan 13, 2016
f7e6a4d
Fix syntax error
Leemoonsoo Jan 13, 2016
18215a3
fix style
Leemoonsoo Jan 13, 2016
9c9c8fd
update test
Leemoonsoo Jan 13, 2016
8251fb4
Fix syntax and style
Leemoonsoo Jan 13, 2016
dedae0d
Remove debug lines
Leemoonsoo Jan 16, 2016
53e2bb4
Not persist on every append
Leemoonsoo Jan 16, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void setUp() {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null, null);
context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void readTest() throws IOException {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();

assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message().contains("SCHEMA_NAME"));
assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}

@Test
Expand All @@ -101,7 +101,7 @@ public void readTestWithConfiguration() throws IOException {
t.open();

assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
}

@Test
Expand All @@ -117,13 +117,13 @@ public void jdbcRestart() throws IOException, SQLException, ClassNotFoundExcepti
t.open();

InterpreterResult interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());

t.getConnection("default").close();

interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}

Expand All @@ -139,7 +139,7 @@ public void test() throws IOException {
HiveInterpreter t = new HiveInterpreter(properties);
t.open();

InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null);
InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);

//simple select test
InterpreterResult result = t.interpret("select * from test_table", interpreterContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";

private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null);

private IgniteInterpreter intp;
private Ignite ignite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";

private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null);

private Ignite ignite;
private IgniteSqlInterpreter intp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void setUp() throws Exception {
context = new InterpreterContext("note", "id", "title", "text",
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
new LinkedList<InterpreterContextRunner>(), null);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private GatewayServer gatewayServer;
private DefaultExecutor executor;
private int port;
private ByteArrayOutputStream outputStream;
private ByteArrayOutputStream errStream;
private SparkOutputStream outputStream;
private BufferedWriter ins;
private PipedInputStream in;
private ByteArrayOutputStream input;
Expand Down Expand Up @@ -173,7 +172,7 @@ private void createGatewayServerAndStartScript() {
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
executor = new DefaultExecutor();
outputStream = new ByteArrayOutputStream();
outputStream = new SparkOutputStream();
PipedOutputStream ps = new PipedOutputStream();
in = null;
try {
Expand Down Expand Up @@ -274,7 +273,6 @@ public void setStatementsFinished(String out, boolean error) {
statementError = error;
statementFinishedNotifier.notify();
}

}

boolean pythonScriptInitialized = false;
Expand All @@ -287,6 +285,10 @@ public void onPythonScriptInitialized() {
}
}

public void appendOutput(String message) throws IOException {
outputStream.getInterpreterOutput().write(message);
}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
Expand All @@ -300,7 +302,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
+ outputStream.toString());
}

outputStream.reset();
outputStream.setInterpreterOutput(context.out);

synchronized (pythonScriptInitializeNotifier) {
long startTime = System.currentTimeMillis();
Expand All @@ -314,15 +316,24 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
}
}

String errorMessage = "";
try {
context.out.flush();
errorMessage = new String(context.out.toByteArray());
} catch (IOException e) {
throw new InterpreterException(e);
}


if (pythonscriptRunning == false) {
// python script failed to initialize and terminated
return new InterpreterResult(Code.ERROR, "failed to start pyspark"
+ outputStream.toString());
+ errorMessage);
}
if (pythonScriptInitialized == false) {
// timeout. didn't get initialized message
return new InterpreterResult(Code.ERROR, "pyspark is not responding "
+ outputStream.toString());
+ errorMessage);
}

if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
Expand Down Expand Up @@ -352,7 +363,14 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
if (statementError) {
return new InterpreterResult(Code.ERROR, statementOutput);
} else {
return new InterpreterResult(Code.SUCCESS, statementOutput);

try {
context.out.flush();
} catch (IOException e) {
throw new InterpreterException(e);
}

return new InterpreterResult(Code.SUCCESS);
}
}

Expand Down Expand Up @@ -389,8 +407,6 @@ public List<String> completion(String buf, int cursor) {
return new LinkedList<String>();
}

outputStream.reset();

pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
statementOutput = null;

Expand Down
32 changes: 17 additions & 15 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.zeppelin.spark;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -41,7 +39,6 @@
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
Expand Down Expand Up @@ -115,7 +112,7 @@ public class SparkInterpreter extends Interpreter {
private SparkILoop interpreter;
private SparkIMain intp;
private SparkContext sc;
private ByteArrayOutputStream out;
private SparkOutputStream out;
private SQLContext sqlc;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
Expand All @@ -129,7 +126,7 @@ public class SparkInterpreter extends Interpreter {

public SparkInterpreter(Properties property) {
super(property);
out = new ByteArrayOutputStream();
out = new SparkOutputStream();
}

public SparkInterpreter(Properties property, SparkContext sc) {
Expand Down Expand Up @@ -452,10 +449,9 @@ public void open() {
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);

PrintStream printStream = new PrintStream(out);

/* spark interpreter */
this.interpreter = new SparkILoop(null, new PrintWriter(out));

interpreter.settings_$eq(settings);

interpreter.createInterpreter();
Expand All @@ -481,15 +477,14 @@ public void open() {

dep = getDependencyResolver();

z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
z = new ZeppelinContext(sc, sqlc, null, dep,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));

intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
binder = (Map<String, Object>) getValue("_binder");
binder.put("sc", sc);
binder.put("sqlc", sqlc);
binder.put("z", z);
binder.put("out", printStream);

intp.interpret("@transient val z = "
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
Expand Down Expand Up @@ -675,13 +670,13 @@ public InterpreterResult interpret(String[] lines, InterpreterContext context) {
synchronized (this) {
z.setGui(context.getGui());
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
InterpreterResult r = interpretInput(lines);
InterpreterResult r = interpretInput(lines, context);
sc.clearJobGroup();
return r;
}
}

public InterpreterResult interpretInput(String[] lines) {
public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
SparkEnv.set(env);

// add print("") to make sure not finishing with comment
Expand All @@ -692,8 +687,9 @@ public InterpreterResult interpretInput(String[] lines) {
}
linesToRun[lines.length] = "print(\"\")";

Console.setOut((java.io.PrintStream) binder.get("out"));
out.reset();
Console.setOut(context.out);
out.setInterpreterOutput(context.out);
context.out.clear();
Code r = null;
String incomplete = "";

Expand All @@ -713,6 +709,7 @@ public InterpreterResult interpretInput(String[] lines) {
res = intp.interpret(incomplete + s);
} catch (Exception e) {
sc.clearJobGroup();
out.setInterpreterOutput(null);
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
Expand All @@ -721,7 +718,8 @@ public InterpreterResult interpretInput(String[] lines) {

if (r == Code.ERROR) {
sc.clearJobGroup();
return new InterpreterResult(r, out.toString());
out.setInterpreterOutput(null);
return new InterpreterResult(r, "");
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
Expand All @@ -730,9 +728,13 @@ public InterpreterResult interpretInput(String[] lines) {
}

if (r == Code.INCOMPLETE) {
sc.clearJobGroup();
out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
} else {
return new InterpreterResult(r, out.toString());
sc.clearJobGroup();
out.setInterpreterOutput(null);
return new InterpreterResult(Code.SUCCESS);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;

import org.apache.zeppelin.interpreter.InterpreterOutput;

import java.io.IOException;
import java.io.OutputStream;

/**
* InterpreterOutput can be attached / detached.
*/
public class SparkOutputStream extends OutputStream {
InterpreterOutput interpreterOutput;

public SparkOutputStream() {
}

public InterpreterOutput getInterpreterOutput() {
return interpreterOutput;
}

public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
this.interpreterOutput = interpreterOutput;
}

@Override
public void write(int b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}

@Override
public void write(byte [] b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}

@Override
public void write(byte [] b, int offset, int len) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b, offset, len);
}
}

@Override
public void close() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.close();
}
}

@Override
public void flush() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.flush();
}
}
}
Loading