diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 4565fc09b4e..f02c21b8ddd 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,13 +17,10 @@ package org.apache.zeppelin.flink; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -48,18 +45,33 @@ public class FlinkInterpreter extends Interpreter { public FlinkInterpreter(Properties properties) { super(properties); - this.innerIntp = new FlinkScalaInterpreter(getProperties()); + } + + private void checkScalaVersion() throws InterpreterException { + String scalaVersionString = scala.util.Properties.versionString(); + LOGGER.info("Using Scala: " + scalaVersionString); + if (scalaVersionString.contains("version 2.11")) { + return; + } else { + throw new InterpreterException("Unsupported scala version: " + scalaVersionString + + ", Only scala 2.11 is supported"); + } } @Override public void open() throws InterpreterException { + checkScalaVersion(); + + this.innerIntp = new FlinkScalaInterpreter(getProperties()); this.innerIntp.open(); this.z = this.innerIntp.getZeppelinContext(); } @Override public void close() throws InterpreterException { - this.innerIntp.close(); + if (this.innerIntp != null) { + this.innerIntp.close(); + } } @Override