Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,4 @@ after_failure:
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
- cat zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
- cat flink/*.log
5 changes: 4 additions & 1 deletion flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,10 @@
<skip>false</skip>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Xmx3072m -XX:MaxPermSize=256m </argLine>
<!-- set sun.zip.disableMemoryMapping=true because of
https://blogs.oracle.com/poonam/crashes-in-zipgetentry
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
<argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>

<environmentVariables>
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ int getDefaultSqlParallelism() {
return this.innerIntp.getDefaultSqlParallelism();
}

/**
* Workaround for issue of FLINK-16936.
*/
public void createPlannerAgain() {
this.innerIntp.createPlannerAgain();
}

public ClassLoader getFlinkScalaShellLoader() {
return innerIntp.getFlinkScalaShellLoader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
private FlinkInterpreter flinkInterpreter;
private InterpreterContext curInterpreterContext;
private boolean opened = false;
private ClassLoader originalClassLoader;

public IPyFlinkInterpreter(Properties property) {
super(property);
Expand Down Expand Up @@ -78,16 +79,26 @@ protected Map<String, String> setupKernelEnv() throws IOException {
public InterpreterResult internalInterpret(String st,
InterpreterContext context)
throws InterpreterException {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.internalInterpret("intp.setInterpreterContextInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
result.toString());
try {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.internalInterpret("intp.initJavaThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to initJavaThread: " +
result.toString());
}
return super.internalInterpret(st, context);
} finally {
if (getKernelProcessLauncher().isRunning()) {
InterpreterResult result =
super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
}
}
}
return super.internalInterpret(st, context);
}

@Override
Expand All @@ -105,8 +116,23 @@ public void close() throws InterpreterException {
}
}

public void setInterpreterContextInPythonThread() {
/**
* Called by python process.
*/
public void initJavaThread() {
InterpreterContext.set(curInterpreterContext);
originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
flinkInterpreter.createPlannerAgain();
}

/**
* Called by python process.
*/
public void resetClassLoaderInPythonThread() {
if (originalClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
private FlinkInterpreter flinkInterpreter;
private InterpreterContext curInterpreterContext;
private boolean isOpened = false;
private ClassLoader originalClassLoader;

public PyFlinkInterpreter(Properties properties) {
super(properties);
Expand Down Expand Up @@ -103,22 +104,53 @@ public void open() throws InterpreterException {

@Override
public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
if (isOpened) {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.interpret("intp.setInterpreterContextInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
result.toString());
try {
if (isOpened) {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.interpret("intp.initJavaThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to initJavaThread: " +
result.toString());
}
}
flinkInterpreter.createPlannerAgain();
return super.interpret(st, context);
} finally {
if (getPythonProcessLauncher().isRunning()) {
InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
}
}
}
return super.interpret(st, context);
}

public void setInterpreterContextInPythonThread() {
/**
* Called by python process.
*/
public void initJavaThread() {
InterpreterContext.set(curInterpreterContext);
originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
flinkInterpreter.createPlannerAgain();
}

/**
* Called by python process.
*/
public void resetClassLoaderInPythonThread() {
if (originalClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}

@Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);
flinkInterpreter.cancel(context);
}

@Override
Expand Down
14 changes: 14 additions & 0 deletions flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ public StreamTableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentS
settings.isStreamingMode());
}

public void createPlanner(EnvironmentSettings settings) {
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());

Map<String, String> plannerProperties = settings.toPlannerProperties();
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tblConfig,
blinkFunctionCatalog,
catalogManager);
}

public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
EnvironmentSettings settings) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class FlinkScalaInterpreter(val properties: Properties) {

private var mode: ExecutionMode.Value = _

private var tblEnvFactory: TableEnvFactory = _
private var benv: ExecutionEnvironment = _
private var senv: StreamExecutionEnvironment = _

Expand Down Expand Up @@ -229,7 +230,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
val classLoader = Thread.currentThread().getContextClassLoader
try {
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could find
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
// the TableFactory properly
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
Expand Down Expand Up @@ -299,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);

val tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)

// blink planner
Expand Down Expand Up @@ -547,7 +548,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
field.get(obj)
}

/**
* This is just a workaround to make table api work in multiple threads.
*/
def createPlannerAgain(): Unit = {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val stEnvSetting =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
this.tblEnvFactory.createPlanner(stEnvSetting)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
}

def interpret(code: String, context: InterpreterContext): InterpreterResult = {
createPlannerAgain()
val originalStdOut = System.out
val originalStdErr = System.err;
if (context != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testSelect() throws InterpreterException, IOException {
// select which use scala udf
context = getInterpreterContext();
result = sqlInterpreter.interpret("SELECT addOne(id) as add_one FROM source_table", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
Expand Down
Loading