diff --git a/.travis.yml b/.travis.yml
index 076638c1410..40e368a5688 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -239,3 +239,4 @@ after_failure:
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
- cat zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
+ - cat flink/*.log
diff --git a/flink/pom.xml b/flink/pom.xml
index 21ec1826554..e5dca7b5c64 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -637,7 +637,10 @@
false
1
false
- -Xmx3072m -XX:MaxPermSize=256m
+
+ -Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true
${project.build.directory}/flink-${flink.version}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index b089b363c83..af4de3d2942 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -137,6 +137,13 @@ int getDefaultSqlParallelism() {
return this.innerIntp.getDefaultSqlParallelism();
}
+ /**
+ * Workaround for issue of FLINK-16936.
+ */
+ public void createPlannerAgain() {
+ this.innerIntp.createPlannerAgain();
+ }
+
public ClassLoader getFlinkScalaShellLoader() {
return innerIntp.getFlinkScalaShellLoader();
}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 970f6cfa007..5564a571aff 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -40,6 +40,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
private FlinkInterpreter flinkInterpreter;
private InterpreterContext curInterpreterContext;
private boolean opened = false;
+ private ClassLoader originalClassLoader;
public IPyFlinkInterpreter(Properties property) {
super(property);
@@ -78,16 +79,26 @@ protected Map setupKernelEnv() throws IOException {
public InterpreterResult internalInterpret(String st,
InterpreterContext context)
throws InterpreterException {
- // set InterpreterContext in the python thread first, otherwise flink job could not be
- // associated with paragraph in JobListener
- this.curInterpreterContext = context;
- InterpreterResult result =
- super.internalInterpret("intp.setInterpreterContextInPythonThread()", context);
- if (result.code() != InterpreterResult.Code.SUCCESS) {
- throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
- result.toString());
+ try {
+ // set InterpreterContext in the python thread first, otherwise flink job could not be
+ // associated with paragraph in JobListener
+ this.curInterpreterContext = context;
+ InterpreterResult result =
+ super.internalInterpret("intp.initJavaThread()", context);
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ throw new InterpreterException("Fail to initJavaThread: " +
+ result.toString());
+ }
+ return super.internalInterpret(st, context);
+ } finally {
+ if (getKernelProcessLauncher().isRunning()) {
+ InterpreterResult result =
+ super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+ }
+ }
}
- return super.internalInterpret(st, context);
}
@Override
@@ -105,8 +116,23 @@ public void close() throws InterpreterException {
}
}
- public void setInterpreterContextInPythonThread() {
+ /**
+ * Called by python process.
+ */
+ public void initJavaThread() {
InterpreterContext.set(curInterpreterContext);
+ originalClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+ flinkInterpreter.createPlannerAgain();
+ }
+
+ /**
+ * Called by python process.
+ */
+ public void resetClassLoaderInPythonThread() {
+ if (originalClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
}
@Override
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 4ce4605437e..91ec0fe88af 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -46,6 +46,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
private FlinkInterpreter flinkInterpreter;
private InterpreterContext curInterpreterContext;
private boolean isOpened = false;
+ private ClassLoader originalClassLoader;
public PyFlinkInterpreter(Properties properties) {
super(properties);
@@ -103,22 +104,53 @@ public void open() throws InterpreterException {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
- if (isOpened) {
- // set InterpreterContext in the python thread first, otherwise flink job could not be
- // associated with paragraph in JobListener
- this.curInterpreterContext = context;
- InterpreterResult result =
- super.interpret("intp.setInterpreterContextInPythonThread()", context);
- if (result.code() != InterpreterResult.Code.SUCCESS) {
- throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
- result.toString());
+ try {
+ if (isOpened) {
+ // set InterpreterContext in the python thread first, otherwise flink job could not be
+ // associated with paragraph in JobListener
+ this.curInterpreterContext = context;
+ InterpreterResult result =
+ super.interpret("intp.initJavaThread()", context);
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ throw new InterpreterException("Fail to initJavaThread: " +
+ result.toString());
+ }
+ }
+ flinkInterpreter.createPlannerAgain();
+ return super.interpret(st, context);
+ } finally {
+ if (getPythonProcessLauncher().isRunning()) {
+ InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+ }
}
}
- return super.interpret(st, context);
}
- public void setInterpreterContextInPythonThread() {
+ /**
+ * Called by python process.
+ */
+ public void initJavaThread() {
InterpreterContext.set(curInterpreterContext);
+ originalClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+ flinkInterpreter.createPlannerAgain();
+ }
+
+ /**
+ * Called by python process.
+ */
+ public void resetClassLoaderInPythonThread() {
+ if (originalClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ super.cancel(context);
+ flinkInterpreter.cancel(context);
}
@Override
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 5ab551e9aa9..6720bf2aef5 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -182,6 +182,20 @@ public StreamTableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentS
settings.isStreamingMode());
}
+ public void createPlanner(EnvironmentSettings settings) {
+ Map executorProperties = settings.toExecutorProperties();
+ Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+ Map plannerProperties = settings.toPlannerProperties();
+ ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(
+ plannerProperties,
+ executor,
+ tblConfig,
+ blinkFunctionCatalog,
+ catalogManager);
+ }
+
public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
EnvironmentSettings settings) {
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 4e6f3d0dda1..32962da65f4 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -79,6 +79,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
private var mode: ExecutionMode.Value = _
+ private var tblEnvFactory: TableEnvFactory = _
private var benv: ExecutionEnvironment = _
private var senv: StreamExecutionEnvironment = _
@@ -229,7 +230,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
val classLoader = Thread.currentThread().getContextClassLoader
try {
- // use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could find
+ // use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
// the TableFactory properly
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
@@ -299,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
- val tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
+ this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
// blink planner
@@ -547,7 +548,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
field.get(obj)
}
+ /**
+ * This is just a workaround to make table api work in multiple threads.
+ */
+ def createPlannerAgain(): Unit = {
+ val originalClassLoader = Thread.currentThread().getContextClassLoader
+ try {
+ Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
+ val stEnvSetting =
+ EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
+ this.tblEnvFactory.createPlanner(stEnvSetting)
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader)
+ }
+ }
+
def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+ createPlannerAgain()
val originalStdOut = System.out
val originalStdErr = System.err;
if (context != null) {
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 651645b3cf8..fb51d57c960 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -84,7 +84,7 @@ public void testSelect() throws InterpreterException, IOException {
// select which use scala udf
context = getInterpreterContext();
result = sqlInterpreter.interpret("SELECT addOne(id) as add_one FROM source_table", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index fda41ad37dc..eb678a219ec 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
@@ -45,6 +46,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
private RemoteInterpreterEventClient mockIntpEventClient =
mock(RemoteInterpreterEventClient.class);
+ private LazyOpenInterpreter flinkScalaInterpreter;
protected Properties initIntpProperties() {
Properties p = new Properties();
@@ -62,12 +64,12 @@ protected void startInterpreter(Properties properties) throws InterpreterExcepti
context.setIntpEventClient(mockIntpEventClient);
InterpreterContext.set(context);
- LazyOpenInterpreter flinkInterpreter = new LazyOpenInterpreter(
+ this.flinkScalaInterpreter = new LazyOpenInterpreter(
new FlinkInterpreter(properties));
intpGroup = new InterpreterGroup();
intpGroup.put("session_1", new ArrayList());
- intpGroup.get("session_1").add(flinkInterpreter);
- flinkInterpreter.setInterpreterGroup(intpGroup);
+ intpGroup.get("session_1").add(flinkScalaInterpreter);
+ flinkScalaInterpreter.setInterpreterGroup(intpGroup);
LazyOpenInterpreter pyFlinkInterpreter =
new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
@@ -94,17 +96,17 @@ public void tearDown() throws InterpreterException {
@Test
public void testBatchIPyFlink() throws InterpreterException {
- testBatchPyFlink(interpreter);
+ testBatchPyFlink(interpreter, flinkScalaInterpreter);
}
@Test
- public void testStreamIPyFlink() throws InterpreterException {
- testStreamPyFlink(interpreter);
+ public void testStreamIPyFlink() throws InterpreterException, IOException {
+ testStreamPyFlink(interpreter, flinkScalaInterpreter);
}
- public static void testBatchPyFlink(Interpreter interpreter) throws InterpreterException {
+ public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException {
InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
- InterpreterResult result = interpreter.interpret(
+ InterpreterResult result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
"import shutil\n" +
@@ -131,6 +133,77 @@ public static void testBatchPyFlink(Interpreter interpreter) throws InterpreterE
"bt_env.execute(\"batch_job\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // use group by
+ context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+ result = pyflinkInterpreter.interpret(
+ "import tempfile\n" +
+ "import os\n" +
+ "import shutil\n" +
+ "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+ "if os.path.exists(sink_path):\n" +
+ " if os.path.isfile(sink_path):\n" +
+ " os.remove(sink_path)\n" +
+ " else:\n" +
+ " shutil.rmtree(sink_path)\n" +
+ "b_env.set_parallelism(1)\n" +
+ "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+ "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+ " .with_format(OldCsv()\n" +
+ " .field_delimiter(',')\n" +
+ " .field(\"a\", DataTypes.STRING())\n" +
+ " .field(\"b\", DataTypes.BIGINT())\n" +
+ " .field(\"c\", DataTypes.BIGINT())) \\\n" +
+ " .with_schema(Schema()\n" +
+ " .field(\"a\", DataTypes.STRING())\n" +
+ " .field(\"b\", DataTypes.BIGINT())\n" +
+ " .field(\"c\", DataTypes.BIGINT())) \\\n" +
+ " .register_table_sink(\"batch_sink4\")\n" +
+ "t.group_by(\"c\").select(\"c, sum(a), count(b)\").insert_into(\"batch_sink4\")\n" +
+ "bt_env.execute(\"batch_job4\")"
+ , context);
+ assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
+
+ // use scala udf in pyflink
+ // define scala udf
+ result = flinkScalaInterpreter.interpret(
+ "class AddOne extends ScalarFunction {\n" +
+ " def eval(a: java.lang.Long): String = a + \"\1\"\n" +
+ "}", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = flinkScalaInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
+ context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+ result = pyflinkInterpreter.interpret(
+ "import tempfile\n" +
+ "import os\n" +
+ "import shutil\n" +
+ "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+ "if os.path.exists(sink_path):\n" +
+ " if os.path.isfile(sink_path):\n" +
+ " os.remove(sink_path)\n" +
+ " else:\n" +
+ " shutil.rmtree(sink_path)\n" +
+ "b_env.set_parallelism(1)\n" +
+ "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+ "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+ " .with_format(OldCsv()\n" +
+ " .field_delimiter(',')\n" +
+ " .field(\"a\", DataTypes.BIGINT())\n" +
+ " .field(\"b\", DataTypes.STRING())\n" +
+ " .field(\"c\", DataTypes.STRING())) \\\n" +
+ " .with_schema(Schema()\n" +
+ " .field(\"a\", DataTypes.BIGINT())\n" +
+ " .field(\"b\", DataTypes.STRING())\n" +
+ " .field(\"c\", DataTypes.STRING())) \\\n" +
+ " .register_table_sink(\"batch_sink3\")\n" +
+ "t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" +
+ "bt_env.execute(\"batch_job3\")"
+ , context);
+ assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
}
@Override
@@ -149,33 +222,33 @@ public void testIPythonFailToLaunch() throws InterpreterException {
}
}
- public static void testStreamPyFlink(Interpreter interpreter) throws InterpreterException {
+ public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
InterpreterResult result = interpreter.interpret(
- "import tempfile\n" +
- "import os\n" +
- "import shutil\n" +
- "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
- "if os.path.exists(sink_path):\n" +
- " if os.path.isfile(sink_path):\n" +
- " os.remove(sink_path)\n" +
- " else:\n" +
- " shutil.rmtree(sink_path)\n" +
- "s_env.set_parallelism(1)\n" +
- "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
- "st_env.connect(FileSystem().path(sink_path)) \\\n" +
- " .with_format(OldCsv()\n" +
- " .field_delimiter(',')\n" +
- " .field(\"a\", DataTypes.BIGINT())\n" +
- " .field(\"b\", DataTypes.STRING())\n" +
- " .field(\"c\", DataTypes.STRING())) \\\n" +
- " .with_schema(Schema()\n" +
- " .field(\"a\", DataTypes.BIGINT())\n" +
- " .field(\"b\", DataTypes.STRING())\n" +
- " .field(\"c\", DataTypes.STRING())) \\\n" +
- " .register_table_sink(\"stream_sink\")\n" +
- "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
- "st_env.execute(\"stream_job\")"
+ "import tempfile\n" +
+ "import os\n" +
+ "import shutil\n" +
+ "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+ "if os.path.exists(sink_path):\n" +
+ " if os.path.isfile(sink_path):\n" +
+ " os.remove(sink_path)\n" +
+ " else:\n" +
+ " shutil.rmtree(sink_path)\n" +
+ "s_env.set_parallelism(1)\n" +
+ "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+ "st_env.connect(FileSystem().path(sink_path)) \\\n" +
+ " .with_format(OldCsv()\n" +
+ " .field_delimiter(',')\n" +
+ " .field(\"a\", DataTypes.BIGINT())\n" +
+ " .field(\"b\", DataTypes.STRING())\n" +
+ " .field(\"c\", DataTypes.STRING())) \\\n" +
+ " .with_schema(Schema()\n" +
+ " .field(\"a\", DataTypes.BIGINT())\n" +
+ " .field(\"b\", DataTypes.STRING())\n" +
+ " .field(\"c\", DataTypes.STRING())) \\\n" +
+ " .register_table_sink(\"stream_sink\")\n" +
+ "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
+ "st_env.execute(\"stream_job\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index a42d59407f7..7bbc1dd9bc6 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -19,7 +19,6 @@
import com.google.common.io.Files;
-import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -32,6 +31,8 @@
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.PythonInterpreterTest;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -48,7 +49,7 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
private RemoteInterpreterEventClient mockRemoteEventClient =
mock(RemoteInterpreterEventClient.class);
- private Interpreter flinkInterpreter;
+ private Interpreter flinkScalaInterpreter;
private Interpreter streamSqlInterpreter;
private Interpreter batchSqlInterpreter;
@@ -77,9 +78,9 @@ public void setUp() throws InterpreterException {
.setIntpEventClient(mockRemoteEventClient)
.build();
InterpreterContext.set(context);
- flinkInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
- intpGroup.get("session_1").add(flinkInterpreter);
- flinkInterpreter.setInterpreterGroup(intpGroup);
+ flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
+ intpGroup.get("session_1").add(flinkScalaInterpreter);
+ flinkScalaInterpreter.setInterpreterGroup(intpGroup);
LazyOpenInterpreter iPyFlinkInterpreter =
new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
@@ -108,9 +109,9 @@ public void tearDown() {
}
@Test
- public void testPyFlink() throws InterpreterException {
- IPyFlinkInterpreterTest.testBatchPyFlink(interpreter);
- IPyFlinkInterpreterTest.testStreamPyFlink(interpreter);
+ public void testPyFlink() throws InterpreterException, IOException {
+ IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+ IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
}
protected InterpreterContext getInterpreterContext() {
diff --git a/flink/src/test/resources/log4j.properties b/flink/src/test/resources/log4j.properties
index 0d84434e457..8017840fdc8 100644
--- a/flink/src/test/resources/log4j.properties
+++ b/flink/src/test/resources/log4j.properties
@@ -15,11 +15,12 @@
# limitations under the License.
#
-log4j.rootLogger = WARN, stdout
+log4j.rootLogger = INFO, stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
log4j.logger.org.apache.hive=WARN
+log4j.logger.org.apache.flink=WARN
diff --git a/flink/src/test/resources/log4j2.properties b/flink/src/test/resources/log4j2.properties
index cf94a3ec1bb..1bce906a04c 100755
--- a/flink/src/test/resources/log4j2.properties
+++ b/flink/src/test/resources/log4j2.properties
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-status = WARN
+status = INFO
name = HiveLog4j2
packages = org.apache.hadoop.hive.ql.log
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index 0b55018c458..e403a59bb56 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.python;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.gson.Gson;
import org.apache.commons.exec.CommandLine;
@@ -60,7 +59,7 @@ public class PythonInterpreter extends Interpreter {
private static final int MAX_TIMEOUT_SEC = 30;
private GatewayServer gatewayServer;
- private PythonProcessLauncher pythonProcessLauncher;
+ protected PythonProcessLauncher pythonProcessLauncher;
private File pythonWorkDir;
protected boolean useBuiltinPy4j = true;
@@ -163,7 +162,6 @@ private void createGatewayServerAndStartScript() throws IOException {
}
}
- @VisibleForTesting
public PythonProcessLauncher getPythonProcessLauncher() {
return pythonProcessLauncher;
}
@@ -572,7 +570,7 @@ public void logPythonOutput(String message) {
LOGGER.debug("Python Process Output: " + message);
}
- class PythonProcessLauncher extends ProcessLauncher {
+ public class PythonProcessLauncher extends ProcessLauncher {
PythonProcessLauncher(CommandLine commandLine, Map envs) {
super(commandLine, envs);
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index cabcabcc5ba..f3361a47e3e 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -146,7 +146,7 @@ public void testIpythonKernelCrash_shouldNotHangExecution()
assertEquals(Code.ERROR, result.code());
output = context.out.toInterpreterResultMessage().get(0);
assertTrue(output.getData(),
- output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+ output.getData().contains("Ipython kernel has been stopped. Please check logs. "
+ "It might be because of an out of memory issue."));
}
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
index 5f8164f012c..dff1900fd59 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.jupyter;
-import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.environment.EnvironmentUtils;
@@ -213,7 +212,6 @@ protected Map setupKernelEnv() throws IOException {
return EnvironmentUtils.getProcEnvironment();
}
- @VisibleForTesting
public JupyterKernelProcessLauncher getKernelProcessLauncher() {
return jupyterKernelProcessLauncher;
}