diff --git a/bin/common.sh b/bin/common.sh index d9cb81298a8..25cc14860ee 100644 --- a/bin/common.sh +++ b/bin/common.sh @@ -145,7 +145,7 @@ export JAVA_OPTS JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}" if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then - JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" + JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2.properties'" else JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties" fi diff --git a/conf/log4j2.properties b/conf/log4j2.properties new file mode 100644 index 00000000000..8e6f949ed60 --- /dev/null +++ b/conf/log4j2.properties @@ -0,0 +1,53 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This affects logging for both user code and Flink +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = MainAppender + +# Uncomment this if you want to _only_ change Flink's logging +#logger.flink.name = org.apache.flink +#logger.flink.level = INFO + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +logger.akka.name = akka +logger.akka.level = INFO +logger.kafka.name= org.apache.kafka +logger.kafka.level = INFO +logger.hadoop.name = org.apache.hadoop +logger.hadoop.level = INFO +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = INFO + +logger.flink.name = org.apache.zeppelin.flink +logger.flink.level = DEBUG + + +# Log all infos in the given file +appender.main.name = MainAppender +appender.main.type = File +appender.main.append = false +appender.main.fileName = ${sys:zeppelin.log.file} +appender.main.layout.type = PatternLayout +appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# Suppress the irrelevant (wrong) warnings from the Netty channel handler +logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = OFF diff --git a/flink/flink-shims/pom.xml b/flink/flink-shims/pom.xml new file mode 100644 index 00000000000..5ca05688cdc --- /dev/null +++ b/flink/flink-shims/pom.xml @@ -0,0 +1,53 @@ + + + + + + + flink-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + flink-shims + 0.9.0-SNAPSHOT + jar + Zeppelin: Flink Shims + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + \ No newline at end of file diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java new file mode 100644 index 00000000000..ef5f0a0af06 --- /dev/null +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.InetAddress; +import java.util.List; +import java.util.Properties; + +/** + * This is abstract class for anything that is api incompatible between different flink versions. It will + * load the correct version of FlinkShims based on the version of flink. + */ +public abstract class FlinkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlinkShims.class); + + private static FlinkShims flinkShims; + + protected Properties properties; + + public FlinkShims(Properties properties) { + this.properties = properties; + } + + private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties properties) + throws Exception { + Class flinkShimsClass; + if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) { + LOGGER.info("Initializing shims for Flink 1.10"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() >= 11) { + LOGGER.info("Initializing shims for Flink 1.11"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims"); + } else { + throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); + } + + Constructor c = flinkShimsClass.getConstructor(Properties.class); + return (FlinkShims) c.newInstance(properties); + } + + /** + * + * @param flinkVersion + * @param properties + * @return + */ + public static FlinkShims getInstance(FlinkVersion flinkVersion, + Properties properties) throws Exception { + if (flinkShims == null) { + flinkShims = loadShims(flinkVersion, properties); + } + return flinkShims; + } + + public abstract Object createCatalogManager(Object config); + + public abstract String getPyFlinkPythonPath(Properties properties) throws IOException; + + public abstract Object getCollectStreamTableSink(InetAddress targetAddress, + int targetPort, + Object serializer); + + public abstract List collectToList(Object table) throws Exception; + + public abstract void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception; + + public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception; + + public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception; + + public abstract boolean rowEquals(Object row1, Object row2); + + public abstract Object fromDataSet(Object btenv, Object ds); + + public abstract Object toDataSet(Object btenv, Object table); + + public abstract void registerTableFunction(Object btenv, String name, Object tableFunction); + + public abstract void registerAggregateFunction(Object btenv, String name, Object aggregateFunction); + + public abstract void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction); + + public abstract void registerTableSink(Object stenv, String tableName, Object collectTableSink); +} diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java new file mode 100644 index 00000000000..c0566adce69 --- /dev/null +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FlinkVersion { + private static final Logger logger = LoggerFactory.getLogger(FlinkVersion.class); + + private int majorVersion; + private int minorVersion; + private int patchVersion; + private String versionString; + + FlinkVersion(String versionString) { + this.versionString = versionString; + + try { + int pos = versionString.indexOf('-'); + + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + + String versions[] = numberPart.split("\\."); + this.majorVersion = Integer.parseInt(versions[0]); + this.minorVersion = Integer.parseInt(versions[1]); + if (versions.length == 3) { + this.patchVersion = Integer.parseInt(versions[2]); + } + + } catch (Exception e) { + logger.error("Can not recognize Spark version " + versionString + + ". Assume it's a future release", e); + } + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } + + public String toString() { + return versionString; + } + + public static FlinkVersion fromVersionString(String versionString) { + return new FlinkVersion(versionString); + } + + public boolean isFlink110() { + return this.majorVersion == 1 && minorVersion == 10; + } +} diff --git a/flink/flink1.10-shims/pom.xml b/flink/flink1.10-shims/pom.xml new file mode 100644 index 00000000000..8a60436e242 --- /dev/null +++ b/flink/flink1.10-shims/pom.xml @@ -0,0 +1,228 @@ + + + + + + flink-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + flink1.10-shims + 0.9.0-SNAPSHOT + jar + Zeppelin: Flink1.10 Shims + + + 1.10.0 + 2.11 + 2.11.12 + + + + + + org.apache.zeppelin + flink-shims + ${project.version} + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + + org.apache.flink + flink-clients_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-runtime_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-bridge_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner-blink_2.11 + ${flink.version} + provided + + + org.reflections + reflections + + + + + + org.apache.flink + flink-table-planner_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-python_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-scala-shell_2.11 + ${flink.version} + provided + + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${scala.version} + + -unchecked + -deprecation + -feature + -target:jvm-1.8 + + + -Xms1024m + -Xmx1024m + -XX:MaxMetaspaceSize=${MaxMetaspace} + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path,-options + + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + \ No newline at end of file diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java new file mode 100644 index 00000000000..dec35600c51 --- /dev/null +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.scala.DataSet; +import org.apache.flink.python.util.ResourceUtil; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableUtils; +import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.scala.BatchTableEnvironment; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.TableAggregateFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; +import org.apache.zeppelin.flink.shims111.Flink110ScalaShims; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.file.Files; +import java.util.List; +import java.util.Properties; + + +/** + * Shims for flink 1.10 + */ +public class Flink110Shims extends FlinkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink110Shims.class); + + public Flink110Shims(Properties properties) { + super(properties); + } + + @Override + public Object createCatalogManager(Object config) { + return new CatalogManager("default_catalog", + new GenericInMemoryCatalog("default_catalog", "default_database")); + } + + + @Override + public String getPyFlinkPythonPath(Properties properties) throws IOException { + String flinkHome = System.getenv("FLINK_HOME"); + if (flinkHome != null) { + File tmpDir = Files.createTempDirectory("zeppelin").toFile(); + List depFiles = null; + try { + depFiles = ResourceUtil.extractBuiltInDependencies(tmpDir.getAbsolutePath(), "pyflink", true); + } catch (InterruptedException e) { + throw new IOException(e); + } + StringBuilder builder = new StringBuilder(); + for (File file : depFiles) { + LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath()); + builder.append(file.getAbsolutePath() + ":"); + } + return builder.toString(); + } else { + throw new IOException("No FLINK_HOME is specified"); + } + } + + @Override + public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { + return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer>) serializer); + } + + @Override + public List collectToList(Object table) throws Exception { + return TableUtils.collectToList((Table) table); + } + + @Override + public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception { + + } + + @Override + public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception { + ((TableEnvironment) tblEnv).sqlUpdate(sql); + } + + @Override + public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception { + ((TableEnvironment) tblEnv).execute(sql); + return true; + } + + @Override + public boolean rowEquals(Object row1, Object row2) { + return ((Row)row1).equals((Row) row2); + } + + public Object fromDataSet(Object btenv, Object ds) { + return Flink110ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); + } + + @Override + public Object toDataSet(Object btenv, Object table) { + return Flink110ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); + } + + @Override + public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { + ((TableEnvironment) stenv).registerTableSink(tableName, (TableSink) collectTableSink); + } + + @Override + public void registerTableFunction(Object btenv, String name, Object tableFunction) { + ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableFunction) tableFunction); + } + + @Override + public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { + ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); + } + + @Override + public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { + ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); + } +} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java similarity index 98% rename from flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java rename to flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java index 74911aebeb6..b7a0ea79bca 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/CollectStreamTableSink.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.zeppelin.flink.sql; +package org.apache.zeppelin.flink.shims111; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; diff --git a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala new file mode 100644 index 00000000000..cfd8894f4c7 --- /dev/null +++ b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims111 + +import org.apache.flink.api.scala.{DataSet, FlinkILoop} +import org.apache.flink.table.api.Table +import org.apache.flink.table.api.scala.BatchTableEnvironment +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl, StreamTableEnvironmentImpl} + +object Flink110ScalaShims { + + def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { + btenv.fromDataSet(ds) + } + + def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { + btenv.toDataSet[Row](table) + } +} diff --git a/flink/flink1.11-shims/pom.xml b/flink/flink1.11-shims/pom.xml new file mode 100644 index 00000000000..458e560e704 --- /dev/null +++ b/flink/flink1.11-shims/pom.xml @@ -0,0 +1,221 @@ + + + + + + flink-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + flink1.11-shims + 0.9.0-SNAPSHOT + jar + Zeppelin: Flink1.11 Shims + + + 1.11-SNAPSHOT + 2.11 + 2.11.12 + + + + + + org.apache.zeppelin + flink-shims + ${project.version} + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + + org.apache.flink + flink-clients_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-runtime_2.11 + ${flink.version} + provided + + + + + org.apache.flink + flink-table-api-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-bridge_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner-blink_2.11 + ${flink.version} + provided + + + org.reflections + reflections + + + + + + org.apache.flink + flink-table-planner_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-python_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-scala-shell_2.11 + ${flink.version} + provided + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${scala.version} + + -unchecked + -deprecation + -feature + -target:jvm-1.8 + + + -Xms1024m + -Xmx1024m + -XX:MaxMetaspaceSize=${MaxMetaspace} + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path,-options + + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + \ No newline at end of file diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java new file mode 100644 index 00000000000..ea11cede8b9 --- /dev/null +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.compress.utils.Lists; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.scala.DataSet; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.TableAggregateFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; +import org.apache.zeppelin.flink.shims111.Flink111ScalaShims; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Shims for flink 1.11 + */ +public class Flink111Shims extends FlinkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class); + + private Map statementSetMap = new ConcurrentHashMap<>(); + + public Flink111Shims(Properties properties) { + super(properties); + } + + @Override + public Object createCatalogManager(Object config) { + return CatalogManager.newBuilder() + .classLoader(Thread.currentThread().getContextClassLoader()) + .config((ReadableConfig) config) + .defaultCatalog("default_catalog", + new GenericInMemoryCatalog("default_catalog", "default_database")) + .build(); + } + + @Override + public String getPyFlinkPythonPath(Properties properties) throws IOException { + String flinkHome = System.getenv("FLINK_HOME"); + if (flinkHome != null) { + List depFiles = null; + depFiles = Arrays.asList(new File(flinkHome + "/opt/python").listFiles()); + StringBuilder builder = new StringBuilder(); + for (File file : depFiles) { + LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath()); + builder.append(file.getAbsolutePath() + ":"); + } + return builder.toString(); + } else { + throw new IOException("No FLINK_HOME is specified"); + } + } + + @Override + public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { + return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer>) serializer); + } + + @Override + public List collectToList(Object table) throws Exception { + return Lists.newArrayList(((Table) table).execute().collect()); + } + + @Override + public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception { + StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet(); + statementSetMap.put(context.getParagraphId(), statementSet); + } + + @Override + public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception { + statementSetMap.get(context.getParagraphId()).addInsertSql(sql); + } + + @Override + public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception { + JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get(); + while(!jobClient.getJobStatus().get().isTerminalState()) { + LOGGER.debug("Wait for job to finish"); + Thread.sleep(1000 * 5); + } + if (jobClient.getJobStatus().get() == JobStatus.CANCELED) { + context.out.write("Job is cancelled.\n"); + return false; + } + return true; + } + + @Override + public boolean rowEquals(Object row1, Object row2) { + Row r1 = (Row) row1; + Row r2 = (Row) row2; + r1.setKind(RowKind.INSERT); + r2.setKind(RowKind.INSERT); + return r1.equals(r2); + } + + @Override + public Object fromDataSet(Object btenv, Object ds) { + return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); + } + + @Override + public Object toDataSet(Object btenv, Object table) { + return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); + } + + @Override + public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { + ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) + .registerTableSinkInternal(tableName, (TableSink) collectTableSink); + } + + @Override + public void registerTableFunction(Object btenv, String name, Object tableFunction) { + ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableFunction) tableFunction); + } + + @Override + public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { + ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); + } + + @Override + public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { + ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); + } + +} diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java new file mode 100644 index 00000000000..b98f406e63d --- /dev/null +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims111; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer> serializer; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink consumeDataStream(DataStream> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala b/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala new file mode 100644 index 00000000000..abdaca2852c --- /dev/null +++ b/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims111 + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.table.api.Table +import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment +import org.apache.flink.types.Row + +object Flink111ScalaShims { + + def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { + btenv.fromDataSet(ds) + } + + def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { + btenv.toDataSet[Row](table) + } +} diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml new file mode 100644 index 00000000000..bb991a391c8 --- /dev/null +++ b/flink/interpreter/pom.xml @@ -0,0 +1,919 @@ + + + + + 4.0.0 + + + flink-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + org.apache.zeppelin + zeppelin-flink + jar + 0.9.0-SNAPSHOT + Zeppelin: Flink + Zeppelin Flink Interpreter + + + + flink + + 1.10.0 + 2.6.5 + 2.3.4 + 4.0.0 + 1.15.0 + + 2.0.1 + 2.11 + 2.11.12 + + https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${scala.binary.version}.tgz + + + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + log4j + log4j + + + + org.apache.zeppelin + flink-shims + ${project.version} + + + + org.apache.zeppelin + flink1.10-shims + ${project.version} + + + + org.apache.zeppelin + flink1.11-shims + ${project.version} + + + + org.apache.zeppelin + zeppelin-python + ${project.version} + + + io.atomix + * + + + com.google.guava + guava + + + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + io.atomix + * + + + com.google.guava + guava + + + io.grpc + * + + + + + + ${project.groupId} + zeppelin-python + ${project.version} + tests + test + + + io.atomix + * + + + com.google.guava + guava + + + io.grpc + * + + + + + + org.jline + jline-terminal + 3.9.0 + + + + org.jline + jline-reader + 3.9.0 + + + + org.apache.flink + flink-python_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-yarn_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-shaded-hadoop2 + + + org.apache.hadoop + * + + + org.eclipse.jetty + * + + + io.netty + netty + + + + + + org.apache.flink + flink-table-api-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-scala-shell_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + + org.apache.flink + flink-connector-hive_2.11 + ${flink.version} + provided + + + + org.apache.flink + flink-connector-hive_2.11 + ${flink.version} + tests + test + + + + + org.apache.flink + flink-hadoop-compatibility_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.ivy + ivy + 2.4.0 + + + + oro + + oro + 2.0.8 + + + + + com.google.code.gson + gson + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + com.mashape.unirest + unirest-java + 1.4.9 + + + + org.apache.flink + flink-connector-hive_${scala.binary.version} + ${flink.version} + provided + + + org.apache.hive + hive-metastore + + + org.apache.hive + hive-exec + + + + + org.apache.flink + flink-connector-hive_${scala.binary.version} + ${flink.version} + tests + test + + + org.apache.hive + hive-exec + + + org.apache.hive + hive-metastore + + + + + + org.apache.flink + flink-table-planner-blink_2.11 + ${flink.version} + provided + + + org.reflections + reflections + + + + + + org.apache.flink + flink-table-planner_2.11 + ${flink.version} + provided + + + + org.apache.hadoop + hadoop-common + ${flink.hadoop.version} + provided + + + + org.apache.hadoop + hadoop-hdfs + ${flink.hadoop.version} + provided + + + + org.apache.hadoop + hadoop-yarn-common + ${flink.hadoop.version} + provided + + + + org.apache.hadoop + hadoop-yarn-client + ${flink.hadoop.version} + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${flink.hadoop.version} + provided + + + + org.mockito + mockito-core + test + + + + org.powermock + powermock-api-mockito + test + + + + org.powermock + powermock-module-junit4 + test + + + + org.apache.hive + hive-metastore + ${hive.version} + provided + + + hadoop-auth + org.apache.hadoop + + + com.google.guava + guava + + + io.netty + netty + + + io.netty + netty-all + + + com.google.protobuf + protobuf-java + + + + + + org.apache.hive + hive-exec + ${hive.version} + provided + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-druid + + + org.apache.calcite.avatica + avatica + + + commons-codec + commons-codec + + + commons-httpclient + commons-httpclient + + + commons-io + commons-io + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + slf4j-api + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + curator-framework + + + org.apache.curator + apache-curator + + + com.google.code.gson + gson + + + jline + jline + + + com.google.guava + guava + + + io.netty + netty + + + io.netty + netty-all + + + com.google.protobuf + protobuf-java + + + + + + org.apache.hive.hcatalog + hive-webhcat-java-client + ${hive.version} + test + + + org.apache.calcite + * + + + com.google.guava + guava + + + io.netty + netty + + + javax.jms + jms + + + + + + org.apache.hive + hive-contrib + ${hive.version} + test + + + + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive.version} + test + test-jar + + + jline + jline + + + com.google.guava + guava + + + io.netty + netty + + + io.netty + netty-all + + + + + + net.jodah + concurrentunit + 0.4.4 + test + + + + com.klarna + hiverunner + 4.0.0 + test + + + com.google.guava + guava + + + + + + net.jodah + concurrentunit + 0.4.4 + test + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${scala.version} + + + + -unchecked + -deprecation + -feature + -target:jvm-1.8 + + + -Xms1024m + -Xmx1024m + -XX:MaxMetaspaceSize=${MaxMetaspace} + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path,-options + + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + + + download-flink-files + validate + + wget + + + 60000 + 5 + ${flink.bin.download.url} + true + ${project.build.directory} + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + false + always + 1 + false + + -Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true + + + + + ${project.build.directory}/flink-${flink.version} + ${project.build.directory}/test-classes + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + **/*.scala + **/*.java + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + maven-enforcer-plugin + + + maven-dependency-plugin + + + maven-resources-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + *:* + + org/datanucleus/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + org.scala-lang:scala-reflect + org.apache.flink:* + + + + + + + reference.conf + + + + + io.netty + org.apache.zeppelin.shaded.io.netty + + + com.google + org.apache.zeppelin.shaded.com.google + + + ${project.basedir}/../../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar + + + + package + + shade + + + + + + + maven-resources-plugin + + + copy-interpreter-setting + package + + resources + + + ${project.build.directory}/../../../interpreter/${interpreter.name} + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + + + + + + + + + flink-1.10 + + 1.10.0 + + + + + flink-1.11 + + 1.11-SNAPSHOT + + + + + hive2 + + true + + + 2.3.4 + 4.0.0 + + + + + hive1 + + 1.2.1 + 3.2.1 + + + + org.apache.hadoop + hadoop-common + 2.7.5 + provided + + + + + + + diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java similarity index 97% rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java index dab45247109..f39718736f4 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.flink; import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.scheduler.Scheduler; diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java similarity index 96% rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index f02c21b8ddd..244559fc47a 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -21,7 +21,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -127,7 +126,7 @@ StreamExecutionEnvironment getStreamExecutionEnvironment() { return this.innerIntp.getStreamExecutionEnvironment(); } - StreamTableEnvironment getStreamTableEnvironment() { + TableEnvironment getStreamTableEnvironment() { return this.innerIntp.getStreamTableEnvironment("blink"); } @@ -178,6 +177,10 @@ public FlinkScalaInterpreter getInnerIntp() { return this.innerIntp; } + public FlinkShims getFlinkShims() { + return this.innerIntp.getFlinkShims(); + } + public void setSavePointIfNecessary(InterpreterContext context) { this.innerIntp.setSavePointIfNecessary(context); } @@ -186,4 +189,7 @@ public void setParallelismIfNecessary(InterpreterContext context) { this.innerIntp.setParallelismIfNecessary(context); } + public FlinkVersion getFlinkVersion() { + return this.innerIntp.getFlinkVersion(); + } } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java similarity index 95% rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index 2332704f3dd..1e8e8033ffe 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -166,6 +166,7 @@ public InterpreterResult interpret(String st, ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader()); + flinkInterpreter.createPlannerAgain(); flinkInterpreter.setParallelismIfNecessary(context); flinkInterpreter.setSavePointIfNecessary(context); return runSqlList(st, context); @@ -181,7 +182,9 @@ private InterpreterResult runSqlList(String st, InterpreterContext context) { paragraphTableConfigMap.put(context.getParagraphId(), tableConfig); try { + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); List sqls = sqlSplitter.splitSql(st); + boolean isFirstInsert = true; for (String sql : sqls) { Optional sqlCommand = SqlCommandParser.parse(sql); if (!sqlCommand.isPresent()) { @@ -194,6 +197,13 @@ private InterpreterResult runSqlList(String st, InterpreterContext context) { return new InterpreterResult(InterpreterResult.Code.ERROR); } try { + if (sqlCommand.get().command == SqlCommand.INSERT_INTO || + sqlCommand.get().command == SqlCommand.INSERT_OVERWRITE) { + if (isFirstInsert && runAsOne) { + flinkInterpreter.getFlinkShims().startMultipleInsert(tbenv, context); + isFirstInsert = false; + } + } callCommand(sqlCommand.get(), context); context.out.flush(); } catch (Throwable e) { @@ -210,12 +220,12 @@ private InterpreterResult runSqlList(String st, InterpreterContext context) { } } - boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); if (runAsOne) { try { lock.lock(); - this.tbenv.execute(st); - context.out.write("Insertion successfully.\n"); + if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) { + context.out.write("Insertion successfully.\n"); + } } catch (Exception e) { LOGGER.error("Fail to execute sql as one job", e); return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); @@ -225,6 +235,9 @@ private InterpreterResult runSqlList(String st, InterpreterContext context) { } } } + } catch(Exception e) { + LOGGER.error("Fail to execute sql", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { // reset parallelism this.tbenv.getConfig().getConfiguration() @@ -516,11 +529,13 @@ public void callInsertInto(String sql, this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue()); } - this.tbenv.sqlUpdate(sql); boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); if (!runAsOne) { + this.tbenv.sqlUpdate(sql); this.tbenv.execute(sql); context.out.write("Insertion successfully.\n"); + } else { + flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context); } } catch (Exception e) { throw new IOException(e); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java similarity index 92% rename from flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java index f4d231996a7..ab05526caa1 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java @@ -18,12 +18,9 @@ package org.apache.zeppelin.flink; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob; -import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob; import org.apache.zeppelin.flink.sql.AppendStreamSqlJob; +import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob; +import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -70,7 +67,8 @@ public void callInnerSelect(String sql, InterpreterContext context) throws IOExc tbenv, flinkInterpreter.getJobManager(), context, - flinkInterpreter.getDefaultParallelism()); + flinkInterpreter.getDefaultParallelism(), + flinkInterpreter.getFlinkShims()); streamJob.run(sql); } else if (streamType.equalsIgnoreCase("append")) { AppendStreamSqlJob streamJob = new AppendStreamSqlJob( @@ -78,7 +76,8 @@ public void callInnerSelect(String sql, InterpreterContext context) throws IOExc flinkInterpreter.getStreamTableEnvironment(), flinkInterpreter.getJobManager(), context, - flinkInterpreter.getDefaultParallelism()); + flinkInterpreter.getDefaultParallelism(), + flinkInterpreter.getFlinkShims()); streamJob.run(sql); } else if (streamType.equalsIgnoreCase("update")) { UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob( @@ -86,7 +85,8 @@ public void callInnerSelect(String sql, InterpreterContext context) throws IOExc flinkInterpreter.getStreamTableEnvironment(), flinkInterpreter.getJobManager(), context, - flinkInterpreter.getDefaultParallelism()); + flinkInterpreter.getDefaultParallelism(), + flinkInterpreter.getFlinkShims()); streamJob.run(sql); } else { throw new IOException("Unrecognized stream type: " + streamType); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java similarity index 100% rename from flink/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java similarity index 96% rename from flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index 7ffafb244c8..3f7e65f7b2f 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -70,7 +70,7 @@ public ZeppelinContext buildZeppelinContext() { protected Map setupKernelEnv() throws IOException { Map envs = super.setupKernelEnv(); String pythonPath = envs.getOrDefault("PYTHONPATH", ""); - String pyflinkPythonPath = PyFlinkInterpreter.getPyFlinkPythonPath(properties); + String pyflinkPythonPath = flinkInterpreter.getFlinkShims().getPyFlinkPythonPath(properties); envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath); return envs; } @@ -142,6 +142,10 @@ public int getProgress(InterpreterContext context) throws InterpreterException { return flinkInterpreter.getProgress(context); } + public boolean isFlink110() { + return flinkInterpreter.getFlinkVersion().isFlink110(); + } + public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() { return flinkInterpreter.getExecutionEnvironment().getJavaEnv(); } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java similarity index 100% rename from flink/src/main/java/org/apache/zeppelin/flink/JobManager.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java diff --git a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java similarity index 88% rename from flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java index ace08cbd6dd..40d1d112793 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java @@ -17,12 +17,11 @@ package org.apache.zeppelin.flink; -import org.apache.flink.python.util.ResourceUtil; -import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.flink.table.api.TableEnvironment; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.python.IPythonInterpreter; import org.apache.zeppelin.python.PythonInterpreter; import org.slf4j.Logger; @@ -33,7 +32,6 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; -import java.nio.file.Files; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -164,32 +162,11 @@ public void cancel(InterpreterContext context) throws InterpreterException { protected Map setupPythonEnv() throws IOException { Map envs = super.setupPythonEnv(); String pythonPath = envs.getOrDefault("PYTHONPATH", ""); - String pyflinkPythonPath = getPyFlinkPythonPath(properties); + String pyflinkPythonPath = flinkInterpreter.getFlinkShims().getPyFlinkPythonPath(properties); envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath); return envs; } - public static String getPyFlinkPythonPath(Properties properties) throws IOException { - String flinkHome = System.getenv("FLINK_HOME"); - if (flinkHome != null) { - File tmpDir = Files.createTempDirectory("zeppelin").toFile(); - List depFiles = null; - try { - depFiles = ResourceUtil.extractBuiltInDependencies(tmpDir.getAbsolutePath(), "pyflink", true); - } catch (InterruptedException e) { - throw new IOException(e); - } - StringBuilder builder = new StringBuilder(); - for (File file : depFiles) { - LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath()); - builder.append(file.getAbsolutePath() + ":"); - } - return builder.toString(); - } else { - throw new IOException("No FLINK_HOME is specified"); - } - } - @Override protected IPythonInterpreter getIPythonInterpreter() throws InterpreterException { return getInterpreterInTheSameSessionByClassName(IPyFlinkInterpreter.class, false); @@ -213,6 +190,10 @@ public int getProgress(InterpreterContext context) throws InterpreterException { return flinkInterpreter.getProgress(context); } + public boolean isFlink110() { + return flinkInterpreter.getFlinkVersion().isFlink110(); + } + public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() { return flinkInterpreter.getExecutionEnvironment().getJavaEnv(); } diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java new file mode 100644 index 00000000000..75d06bbcf93 --- /dev/null +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.ComponentFactoryService; +import org.apache.flink.table.module.ModuleManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.Map; + +/** + * Factory class for creating flink table env for different purpose: + * 1. java/scala + * 2. stream table / batch table + * 3. flink planner / blink planner + * + */ +public class TableEnvFactory { + + private static Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class); + + private FlinkVersion flinkVersion; + private FlinkShims flinkShims; + private Executor executor; + private org.apache.flink.api.scala.ExecutionEnvironment benv; + private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv; + private TableConfig tblConfig; + private CatalogManager catalogManager; + private ModuleManager moduleManager; + private FunctionCatalog flinkFunctionCatalog; + private FunctionCatalog blinkFunctionCatalog; + + public TableEnvFactory(FlinkVersion flinkVersion, + FlinkShims flinkShims, + org.apache.flink.api.scala.ExecutionEnvironment env, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv, + TableConfig tblConfig, + CatalogManager catalogManager, + ModuleManager moduleManager, + FunctionCatalog flinkFunctionCatalog, + FunctionCatalog blinkFunctionCatalog) { + this.flinkVersion = flinkVersion; + this.flinkShims = flinkShims; + this.benv = env; + this.senv = senv; + this.tblConfig = tblConfig; + this.catalogManager = catalogManager; + this.moduleManager = moduleManager; + this.flinkFunctionCatalog = flinkFunctionCatalog; + this.blinkFunctionCatalog = blinkFunctionCatalog; + } + + public TableEnvironment createScalaFlinkBatchTableEnvironment() { + try { + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl"); + } + Constructor constructor = clazz + .getConstructor( + org.apache.flink.api.scala.ExecutionEnvironment.class, + TableConfig.class, + CatalogManager.class, + ModuleManager.class); + + return (TableEnvironment) + constructor.newInstance(benv, tblConfig, catalogManager, moduleManager); + } catch (Exception e) { + throw new TableException("Fail to createScalaFlinkBatchTableEnvironment", e); + } + } + + public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) { + try { + Map executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); + + Map plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create( + plannerProperties, + executor, + tblConfig, + flinkFunctionCatalog, + catalogManager); + + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); + } + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + flinkFunctionCatalog, + tblConfig, + senv, + planner, + executor, + settings.isStreamingMode()); + + } catch (Exception e) { + throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e); + } + } + + public TableEnvironment createJavaFlinkBatchTableEnvironment() { + try { + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl"); + } + + Constructor con = clazz.getConstructor( + ExecutionEnvironment.class, + TableConfig.class, + CatalogManager.class, + ModuleManager.class); + return (TableEnvironment) con.newInstance( + benv.getJavaEnv(), + tblConfig, + catalogManager, + moduleManager); + } catch (Throwable t) { + throw new TableException("Create BatchTableEnvironment failed.", t); + } + } + + public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) { + + try { + Map executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); + + Map plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, tblConfig, flinkFunctionCatalog, catalogManager); + + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); + } + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + flinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode()); + + } catch (Exception e) { + throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e); + } + } + + public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) { + + try { + Map executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); + + Map plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create( + plannerProperties, + executor, + tblConfig, + blinkFunctionCatalog, + catalogManager); + + + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); + } + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv, + planner, + executor, + settings.isStreamingMode()); + } catch (Exception e) { + throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e); + } + } + + public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings) { + + try { + Map executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); + + Map plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager); + + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); + } + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode()); + } catch (Exception e) { + throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e); + } + } + + public TableEnvironment createJavaBlinkBatchTableEnvironment( + EnvironmentSettings settings) { + try { + final Map executorProperties = settings.toExecutorProperties(); + executor = lookupExecutor(executorProperties, senv.getJavaEnv()); + final Map plannerProperties = settings.toPlannerProperties(); + final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager); + + Class clazz = null; + if (flinkVersion.isFlink110()) { + clazz = Class + .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl"); + } else { + clazz = Class + .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); + } + Constructor constructor = clazz.getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance( + catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode()); + } catch (Exception e) { + LOGGER.info(ExceptionUtils.getStackTrace(e)); + throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e); + } + } + + + public void createPlanner(EnvironmentSettings settings) { + Map executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); + + Map plannerProperties = settings.toPlannerProperties(); + ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create( + plannerProperties, + executor, + tblConfig, + blinkFunctionCatalog, + catalogManager); + } + + private static Executor lookupExecutor( + Map executorProperties, + StreamExecutionEnvironment executionEnvironment) { + try { + ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass() + .getMethod("create", Map.class, StreamExecutionEnvironment.class); + + return (Executor) createMethod.invoke( + executorFactory, + executorProperties, + executionEnvironment); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } +} diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java similarity index 90% rename from flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index fd471752d9e..2d98ef7a059 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -26,15 +26,15 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.streaming.experimental.SocketStreamIterator; -import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.types.Row; +import org.apache.zeppelin.flink.FlinkShims; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,18 +64,21 @@ public abstract class AbstractStreamSqlJob { protected Object resultLock = new Object(); protected volatile boolean enableToRefresh = true; protected int defaultParallelism; + protected FlinkShims flinkShims; protected ScheduledExecutorService refreshScheduler = Executors.newScheduledThreadPool(1); public AbstractStreamSqlJob(StreamExecutionEnvironment senv, TableEnvironment stenv, JobManager jobManager, InterpreterContext context, - int defaultParallelism) { + int defaultParallelism, + FlinkShims flinkShims) { this.senv = senv; this.stenv = stenv; this.jobManager = jobManager; this.context = context; this.defaultParallelism = defaultParallelism; + this.flinkShims = flinkShims; } private static TableSchema removeTimeAttributes(TableSchema schema) { @@ -97,7 +100,8 @@ private static TableSchema removeTimeAttributes(TableSchema schema) { public String run(String st) throws IOException { Table table = stenv.sqlQuery(st); - String tableName = "UnnamedTable_" + st + "_" + SQL_INDEX.getAndIncrement(); + String tableName = "UnnamedTable_" + + "_" + SQL_INDEX.getAndIncrement(); return run(table, tableName); } @@ -125,9 +129,10 @@ public String run(Table table, String tableName) throws IOException { // pass binding address and port such that sink knows where to send to LOGGER.debug("Collecting data at address: " + iterator.getBindAddress() + ":" + iterator.getPort()); - CollectStreamTableSink collectTableSink = - new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); - collectTableSink = collectTableSink.configure( + RetractStreamTableSink collectTableSink = + (RetractStreamTableSink) flinkShims.getCollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); + // new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); + collectTableSink = (RetractStreamTableSink) collectTableSink.configure( outputType.getFieldNames(), outputType.getFieldTypes()); // workaround, otherwise it won't find the sink properly @@ -136,8 +141,8 @@ public String run(Table table, String tableName) throws IOException { try { stenv.useCatalog("default_catalog"); stenv.useDatabase("default_database"); - stenv.registerTableSink(tableName, collectTableSink); - table.insertInto(new StreamQueryConfig(), tableName); + flinkShims.registerTableSink(stenv, tableName, collectTableSink); + table.insertInto(tableName); } finally { stenv.useCatalog(originalCatalog); stenv.useDatabase(originalDatabase); @@ -205,6 +210,7 @@ public void run() { try { while (isRunning && iterator.hasNext()) { final Tuple2 change = iterator.next(); + LOGGER.info(change.f0 + ", " + change.f1); processRecord(change); } } catch (Throwable e) { diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java similarity index 93% rename from flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java index ba5b4fe291f..f1eb99772ff 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java @@ -19,10 +19,11 @@ package org.apache.zeppelin.flink.sql; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.StringUtils; +import org.apache.zeppelin.flink.FlinkShims; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.tabledata.TableDataUtils; @@ -42,11 +43,12 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { private long tsWindowThreshold; public AppendStreamSqlJob(StreamExecutionEnvironment senv, - StreamTableEnvironment stEnv, + TableEnvironment stEnv, JobManager jobManager, InterpreterContext context, - int defaultParallelism) { - super(senv, stEnv, jobManager, context, defaultParallelism); + int defaultParallelism, + FlinkShims flinkShims) { + super(senv, stEnv, jobManager, context, defaultParallelism, flinkShims); this.tsWindowThreshold = Long.parseLong(context.getLocalProperties() .getOrDefault("threshold", 1000 * 60 * 60 + "")); } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java similarity index 93% rename from flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 3c3125f35b3..1c30b63bb61 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; +import org.apache.zeppelin.flink.FlinkShims; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.tabledata.TableDataUtils; @@ -40,8 +41,9 @@ public SingleRowStreamSqlJob(StreamExecutionEnvironment senv, TableEnvironment stenv, JobManager jobManager, InterpreterContext context, - int defaultParallelism) { - super(senv, stenv, jobManager, context, defaultParallelism); + int defaultParallelism, + FlinkShims flinkShims) { + super(senv, stenv, jobManager, context, defaultParallelism, flinkShims); this.template = context.getLocalProperties().getOrDefault("template", "{0}"); } diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java similarity index 100% rename from flink/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java similarity index 90% rename from flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java rename to flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java index 0353d89e792..805afeeef9b 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java @@ -19,9 +19,10 @@ package org.apache.zeppelin.flink.sql; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; -import org.apache.flink.table.api.scala.StreamTableEnvironment; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.StringUtils; +import org.apache.zeppelin.flink.FlinkShims; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.tabledata.TableDataUtils; @@ -40,11 +41,12 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { private List lastSnapshot = new ArrayList<>(); public UpdateStreamSqlJob(StreamExecutionEnvironment senv, - StreamTableEnvironment stEnv, + TableEnvironment stEnv, JobManager jobManager, InterpreterContext context, - int defaultParallelism) { - super(senv, stEnv, jobManager, context, defaultParallelism); + int defaultParallelism, + FlinkShims flinkShims) { + super(senv, stEnv, jobManager, context, defaultParallelism, flinkShims); } @Override @@ -64,6 +66,7 @@ protected void processDelete(Row row) { LOGGER.debug("processDelete: " + row.toString()); for (int i = 0; i < materializedTable.size(); i++) { if (materializedTable.get(i).equals(row)) { + LOGGER.debug("real processDelete: " + row.toString()); materializedTable.remove(i); break; } diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json similarity index 100% rename from flink/src/main/resources/interpreter-setting.json rename to flink/interpreter/src/main/resources/interpreter-setting.json diff --git a/flink/src/main/resources/python/zeppelin_ipyflink.py b/flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py similarity index 78% rename from flink/src/main/resources/python/zeppelin_ipyflink.py rename to flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py index fe94c9f1549..50efbeae803 100644 --- a/flink/src/main/resources/python/zeppelin_ipyflink.py +++ b/flink/interpreter/src/main/resources/python/zeppelin_ipyflink.py @@ -46,11 +46,18 @@ pyflink.java_gateway.install_exception_handler() b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) -bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) -bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment()) -st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) -st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) + +if intp.isFlink110(): + bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) + bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) + st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) +else: + bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) + bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink")) class IPyFlinkZeppelinContext(PyZeppelinContext): diff --git a/flink/src/main/resources/python/zeppelin_pyflink.py b/flink/interpreter/src/main/resources/python/zeppelin_pyflink.py similarity index 76% rename from flink/src/main/resources/python/zeppelin_pyflink.py rename to flink/interpreter/src/main/resources/python/zeppelin_pyflink.py index 8a401b26874..542ab8ffb0f 100644 --- a/flink/src/main/resources/python/zeppelin_pyflink.py +++ b/flink/interpreter/src/main/resources/python/zeppelin_pyflink.py @@ -35,11 +35,18 @@ pyflink.java_gateway.install_exception_handler() b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) -bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) -bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment()) -st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) -st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) + +if intp.isFlink110(): + bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) + bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) + st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) +else: + bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) + bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink")) from zeppelin_context import PyZeppelinContext diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala similarity index 100% rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkExprTyper.scala diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala similarity index 100% rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoopInterpreter.scala diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala similarity index 81% rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 625a8edfad9..51125262b31 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit import java.util.jar.JarFile import org.apache.commons.lang3.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.scala.FlinkShell.{ExecutionMode, _} import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop} @@ -36,12 +37,11 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment} import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment} import org.apache.flink.runtime.jobgraph.SavepointConfigOptions +import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl -import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment} -import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction} import org.apache.flink.table.module.ModuleManager @@ -82,11 +82,11 @@ class FlinkScalaInterpreter(val properties: Properties) { // TableEnvironment of blink planner private var btenv: TableEnvironment = _ - private var stenv: StreamTableEnvironment = _ + private var stenv: TableEnvironment = _ // TableEnvironment of flink planner - private var btenv_2: BatchTableEnvironment = _ - private var stenv_2: StreamTableEnvironment = _ + private var btenv_2: TableEnvironment = _ + private var stenv_2: TableEnvironment = _ // PyFlink depends on java version of TableEnvironment, // so need to create java version of TableEnvironment @@ -97,13 +97,52 @@ class FlinkScalaInterpreter(val properties: Properties) { private var java_stenv_2: TableEnvironment = _ private var z: FlinkZeppelinContext = _ + private var flinkVersion: FlinkVersion = _ + private var flinkShims: FlinkShims = _ private var jmWebUrl: String = _ private var jobManager: JobManager = _ - private var defaultParallelism = 1; - private var defaultSqlParallelism = 1; + private var defaultParallelism = 1 + private var defaultSqlParallelism = 1 private var userJars: Seq[String] = _ def open(): Unit = { + val config = initFlinkConfig() + createFlinkILoop(config) + createTableEnvs() + setTableEnvConfig() + + // init ZeppelinContext + this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(), + Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000"))) + val modifiers = new java.util.ArrayList[String]() + modifiers.add("@transient") + this.bind("z", z.getClass().getCanonicalName(), z, modifiers); + + this.jobManager = new JobManager(this.z, jmWebUrl) + + // register JobListener + val jobListener = new FlinkJobListener() + this.benv.registerJobListener(jobListener) + this.senv.registerJobListener(jobListener) + + // register hive catalog + if (properties.getProperty("zeppelin.flink.enableHive", "false").toBoolean) { + LOGGER.info("Hive is enabled, registering hive catalog.") + registerHiveCatalog() + } else { + LOGGER.info("Hive is disabled.") + } + + // load udf jar + val udfJars = properties.getProperty("flink.udf.jars", "") + if (!StringUtils.isBlank(udfJars)) { + udfJars.split(",").foreach(jar => { + loadUDFJar(jar) + }) + } + } + + private def initFlinkConfig(): Config = { val flinkHome = properties.getProperty("FLINK_HOME", sys.env.getOrElse("FLINK_HOME", "")) val flinkConfDir = properties.getProperty("FLINK_CONF_DIR", sys.env.getOrElse("FLINK_CONF_DIR", "")) val hadoopConfDir = properties.getProperty("HADOOP_CONF_DIR", sys.env.getOrElse("HADOOP_CONF_DIR", "")) @@ -115,6 +154,10 @@ class FlinkScalaInterpreter(val properties: Properties) { LOGGER.info("YARN_CONF_DIR: " + yarnConfDir) LOGGER.info("HIVE_CONF_DIR: " + hiveConfDir) + this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion) + LOGGER.info("Using flink: " + flinkVersion) + this.flinkShims = FlinkShims.getInstance(flinkVersion, properties) + this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir) mode = ExecutionMode.withName( @@ -180,6 +223,10 @@ class FlinkScalaInterpreter(val properties: Properties) { .copy(port = Some(Integer.parseInt(port))) } + config + } + + private def createFlinkILoop(config: Config): Unit = { val printReplOutput = properties.getProperty("zeppelin.flink.printREPLOutput", "true").toBoolean val replOut = if (printReplOutput) { new JPrintWriter(interpreterOutput, true) @@ -234,6 +281,10 @@ class FlinkScalaInterpreter(val properties: Properties) { Thread.currentThread().setContextClassLoader(getFlinkClassLoader) val repl = new FlinkILoop(configuration, config.externalJars, None, replOut) (repl, cluster) + } catch { + case e: Exception => + LOGGER.error(ExceptionUtils.getStackTrace(e)) + throw e } finally { Thread.currentThread().setContextClassLoader(classLoader) } @@ -241,6 +292,7 @@ class FlinkScalaInterpreter(val properties: Properties) { this.flinkILoop = iLoop this.cluster = cluster + val settings = new Settings() settings.usejavacp.value = true settings.Yreplsync.value = true @@ -259,6 +311,42 @@ class FlinkScalaInterpreter(val properties: Properties) { // set execution environment flinkILoop.intp.bind("benv", flinkILoop.scalaBenv) flinkILoop.intp.bind("senv", flinkILoop.scalaSenv) + + val packageImports = Seq[String]( + "org.apache.flink.core.fs._", + "org.apache.flink.core.fs.local._", + "org.apache.flink.api.common.io._", + "org.apache.flink.api.common.aggregators._", + "org.apache.flink.api.common.accumulators._", + "org.apache.flink.api.common.distributions._", + "org.apache.flink.api.common.operators._", + "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint", + "org.apache.flink.api.common.functions._", + "org.apache.flink.api.java.io._", + "org.apache.flink.api.java.aggregation._", + "org.apache.flink.api.java.functions._", + "org.apache.flink.api.java.operators._", + "org.apache.flink.api.java.sampling._", + "org.apache.flink.api.scala._", + "org.apache.flink.api.scala.utils._", + "org.apache.flink.streaming.api.scala._", + "org.apache.flink.streaming.api.windowing.time._", + "org.apache.flink.types.Row" + ) + + flinkILoop.intp.interpret("import " + packageImports.mkString(", ")) + + if (flinkVersion.isFlink110) { + flinkILoop.intp.interpret("import org.apache.flink.table.api.scala._") + } else { + flinkILoop.intp.interpret("import org.apache.flink.table.api._") + flinkILoop.intp.interpret("import org.apache.flink.table.api.bridge.scala._") + } + + flinkILoop.intp.interpret("import org.apache.flink.table.functions.ScalarFunction") + flinkILoop.intp.interpret("import org.apache.flink.table.functions.AggregateFunction") + flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableFunction") + flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableAggregateFunction") } val in0 = getField(flinkILoop, "scala$tools$nsc$interpreter$ILoop$$in0") @@ -280,43 +368,47 @@ class FlinkScalaInterpreter(val properties: Properties) { this.senv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)) setAsContext() + } + private def createTableEnvs(): Unit = { val originalClassLoader = Thread.currentThread().getContextClassLoader try { Thread.currentThread().setContextClassLoader(getFlinkClassLoader) val tblConfig = new TableConfig tblConfig.getConfiguration.addAll(configuration) // Step 1.1 Initialize the CatalogManager if required. - val catalogManager = new CatalogManager("default_catalog", - new GenericInMemoryCatalog("default_catalog", "default_database")); + val catalogManager = flinkShims.createCatalogManager(tblConfig.getConfiguration).asInstanceOf[CatalogManager] // Step 1.2 Initialize the ModuleManager if required. val moduleManager = new ModuleManager(); // Step 1.3 Initialize the FunctionCatalog if required. val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager); val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager); - this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig, + this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, this.benv, this.senv, tblConfig, catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog) + val modifiers = new java.util.ArrayList[String]() + modifiers.add("@transient") + // blink planner var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build() this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting); - flinkILoop.intp.bind("btenv", this.btenv.asInstanceOf[StreamTableEnvironmentImpl]) + flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient")) this.java_btenv = this.btenv var stEnvSetting = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build() this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting) - flinkILoop.intp.bind("stenv", this.stenv) + flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting) // flink planner this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() - flinkILoop.intp.bind("btenv_2", this.btenv_2) + flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient")) stEnvSetting = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build() this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting) - flinkILoop.intp.bind("stenv_2", this.stenv_2) + flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient")) this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build @@ -324,7 +416,9 @@ class FlinkScalaInterpreter(val properties: Properties) { } finally { Thread.currentThread().setContextClassLoader(originalClassLoader) } + } + private def setTableEnvConfig(): Unit = { this.properties.asScala.filter(e => e._1.startsWith("table.exec")) .foreach(e => { this.btenv.getConfig.getConfiguration.setString(e._1, e._2) @@ -347,97 +441,24 @@ class FlinkScalaInterpreter(val properties: Properties) { this.benv.getConfig.disableSysoutLogging() this.senv.getConfig.disableSysoutLogging() } + } - flinkILoop.interpret("import org.apache.flink.api.scala._") - flinkILoop.interpret("import org.apache.flink.table.api.scala._") - flinkILoop.interpret("import org.apache.flink.types.Row") - flinkILoop.interpret("import org.apache.flink.table.functions.ScalarFunction") - flinkILoop.interpret("import org.apache.flink.table.functions.AggregateFunction") - flinkILoop.interpret("import org.apache.flink.table.functions.TableFunction") - - this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(), - Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000"))) - val modifiers = new java.util.ArrayList[String]() - modifiers.add("@transient") - this.bind("z", z.getClass().getCanonicalName(), z, modifiers); - - this.jobManager = new JobManager(this.z, jmWebUrl) - - val jobListener = new JobListener { - override def onJobSubmitted(jobClient: JobClient, e: Throwable): Unit = { - if (e != null) { - LOGGER.warn("Fail to submit job") - } else { - if (InterpreterContext.get() == null) { - LOGGER.warn("Job {} is submitted but unable to associate this job to paragraph, " + - "as InterpreterContext is null", jobClient.getJobID) - } else { - LOGGER.info("Job {} is submitted for paragraph {}", Array(jobClient.getJobID, - InterpreterContext.get().getParagraphId): _ *) - jobManager.addJob(InterpreterContext.get(), jobClient) - if (jmWebUrl != null) { - jobManager.sendFlinkJobUrl(InterpreterContext.get()); - } else { - LOGGER.error("Unable to link JobURL, because JobManager weburl is null") - } - } - } - } - - override def onJobExecuted(jobExecutionResult: JobExecutionResult, e: Throwable): Unit = { - if (e != null) { - LOGGER.warn("Fail to execute job") - } else { - LOGGER.info("Job {} is executed with time {} seconds", jobExecutionResult.getJobID, - jobExecutionResult.getNetRuntime(TimeUnit.SECONDS)) - } - if (InterpreterContext.get() != null) { - jobManager.removeJob(InterpreterContext.get().getParagraphId) - } else { - if (e == null) { - LOGGER.warn("Unable to remove this job {}, as InterpreterContext is null", - jobExecutionResult.getJobID) - } - } - } - } - - this.benv.registerJobListener(jobListener) - this.senv.registerJobListener(jobListener) - - // register hive catalog - if (properties.getProperty("zeppelin.flink.enableHive", "false").toBoolean) { - LOGGER.info("Hive is enabled, registering hive catalog.") - val hiveConfDir = - properties.getOrDefault("HIVE_CONF_DIR", System.getenv("HIVE_CONF_DIR")).toString - if (hiveConfDir == null) { - throw new InterpreterException("HIVE_CONF_DIR is not specified"); - } - val database = properties.getProperty("zeppelin.flink.hive.database", "default") - if (database == null) { - throw new InterpreterException("default database is not specified, " + - "please set zeppelin.flink.hive.database") - } - val hiveVersion = properties.getProperty("zeppelin.flink.hive.version", "2.3.4") - val hiveCatalog = new HiveCatalog("hive", database, hiveConfDir, hiveVersion) - this.btenv.registerCatalog("hive", hiveCatalog) - this.btenv.useCatalog("hive") - this.btenv.useDatabase("default") - this.btenv.loadModule("hive", new HiveModule(hiveVersion)) - } else { - LOGGER.info("Hive is disabled.") - } - - // load udf jar - val udfJars = properties.getProperty("flink.udf.jars", "") - if (!StringUtils.isBlank(udfJars)) { - udfJars.split(",").foreach(jar => { - loadUDFJar(jar) - }) + private def registerHiveCatalog(): Unit = { + val hiveConfDir = + properties.getOrDefault("HIVE_CONF_DIR", System.getenv("HIVE_CONF_DIR")).toString + if (hiveConfDir == null) { + throw new InterpreterException("HIVE_CONF_DIR is not specified"); } + val database = properties.getProperty("zeppelin.flink.hive.database", "default") + val hiveVersion = properties.getProperty("zeppelin.flink.hive.version", "2.3.4") + val hiveCatalog = new HiveCatalog("hive", database, hiveConfDir, hiveVersion) + this.btenv.registerCatalog("hive", hiveCatalog) + this.btenv.useCatalog("hive") + this.btenv.useDatabase(database) + this.btenv.loadModule("hive", new HiveModule(hiveVersion)) } - def loadUDFJar(jar: String): Unit = { + private def loadUDFJar(jar: String): Unit = { LOGGER.info("Loading UDF Jar: " + jar) val jarFile = new JarFile(jar) val entries = jarFile.entries @@ -459,13 +480,13 @@ class FlinkScalaInterpreter(val properties: Properties) { btenv.registerFunction(c.getSimpleName, scalarUDF) } else if (udf.isInstanceOf[TableFunction[_]]) { val tableUDF = udf.asInstanceOf[TableFunction[_]] - (btenv.asInstanceOf[StreamTableEnvironmentImpl]).registerFunction(c.getSimpleName, tableUDF) + flinkShims.registerTableFunction(btenv, c.getSimpleName, tableUDF) } else if (udf.isInstanceOf[AggregateFunction[_,_]]) { val aggregateUDF = udf.asInstanceOf[AggregateFunction[_,_]] - (btenv.asInstanceOf[StreamTableEnvironmentImpl]).registerFunction(c.getSimpleName, aggregateUDF) + flinkShims.registerAggregateFunction(btenv, c.getSimpleName, aggregateUDF) } else if (udf.isInstanceOf[TableAggregateFunction[_,_]]) { val tableAggregateUDF = udf.asInstanceOf[TableAggregateFunction[_,_]] - (btenv.asInstanceOf[StreamTableEnvironmentImpl]).registerFunction(c.getSimpleName, tableAggregateUDF) + flinkShims.registerTableAggregateFunction(btenv, c.getSimpleName, tableAggregateUDF) } else { LOGGER.warn("No UDF definition found in class file: " + je.getName) } @@ -477,7 +498,7 @@ class FlinkScalaInterpreter(val properties: Properties) { } } - def setAsContext(): Unit = { + private def setAsContext(): Unit = { val streamFactory = new StreamExecutionEnvironmentFactory() { override def createExecutionEnvironment = senv.getJavaEnv } @@ -692,7 +713,7 @@ class FlinkScalaInterpreter(val properties: Properties) { this.btenv_2 } - def getStreamTableEnvironment(planner: String = "blink"): StreamTableEnvironment = { + def getStreamTableEnvironment(planner: String = "blink"): TableEnvironment = { if (planner == "blink") this.stenv else @@ -764,6 +785,49 @@ class FlinkScalaInterpreter(val properties: Properties) { def getFlinkILoop = flinkILoop + def getFlinkShims = flinkShims + + def getFlinkVersion = flinkVersion + + class FlinkJobListener extends JobListener { + + override def onJobSubmitted(jobClient: JobClient, e: Throwable): Unit = { + if (e != null) { + LOGGER.warn("Fail to submit job") + } else { + if (InterpreterContext.get() == null) { + LOGGER.warn("Job {} is submitted but unable to associate this job to paragraph, " + + "as InterpreterContext is null", jobClient.getJobID) + } else { + LOGGER.info("Job {} is submitted for paragraph {}", Array(jobClient.getJobID, + InterpreterContext.get().getParagraphId): _ *) + jobManager.addJob(InterpreterContext.get(), jobClient) + if (jmWebUrl != null) { + jobManager.sendFlinkJobUrl(InterpreterContext.get()); + } else { + LOGGER.error("Unable to link JobURL, because JobManager weburl is null") + } + } + } + } + + override def onJobExecuted(jobExecutionResult: JobExecutionResult, e: Throwable): Unit = { + if (e != null) { + LOGGER.warn("Fail to execute job") + } else { + LOGGER.info("Job {} is executed with time {} seconds", jobExecutionResult.getJobID, + jobExecutionResult.getNetRuntime(TimeUnit.SECONDS)) + } + if (InterpreterContext.get() != null) { + jobManager.removeJob(InterpreterContext.get().getParagraphId) + } else { + if (e == null) { + LOGGER.warn("Unable to remove this job {}, as InterpreterContext is null", + jobExecutionResult.getJobID) + } + } + } + } } diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala similarity index 85% rename from flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala index 146ec63c44c..b3a964a6699 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala @@ -21,12 +21,11 @@ package org.apache.zeppelin.flink import java.io.IOException import java.util.concurrent.atomic.AtomicInteger -import com.google.common.collect.Maps import org.apache.flink.api.scala.DataSet import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.internal.TableImpl -import org.apache.flink.table.api.{Table, TableEnvironment, TableUtils} -import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} +import org.apache.flink.table.api.Table +//import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.types.Row import org.apache.flink.util.StringUtils import org.apache.zeppelin.annotation.ZeppelinApi @@ -103,13 +102,17 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter, override def showData(obj: Any, maxResult: Int): String = { if (obj.isInstanceOf[DataSet[_]]) { val ds = obj.asInstanceOf[DataSet[_]] - val btenv = flinkInterpreter.getBatchTableEnvironment("flink").asInstanceOf[BatchTableEnvironment] - val table = btenv.fromDataSet(ds) + val btenv = flinkInterpreter.getBatchTableEnvironment("flink")//.asInstanceOf[BatchTableEnvironment] + + val table = flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table] + //btenv.fromDataSet(ds) val columnNames: Array[String] = table.getSchema.getFieldNames - val dsRows: DataSet[Row] = btenv.toDataSet[Row](table) + val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]] + // btenv.toDataSet[Row](table) showTable(columnNames, dsRows.first(maxResult + 1).collect()) } else if (obj.isInstanceOf[Table]) { - val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(obj.asInstanceOf[TableImpl])).toSeq + val rows = JavaConversions.asScalaBuffer( + flinkInterpreter.getFlinkShims.collectToList(obj.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq val columnNames = obj.asInstanceOf[Table].getSchema.getFieldNames showTable(columnNames, rows) } else { @@ -119,35 +122,39 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter, def showFlinkTable(table: Table): String = { val columnNames: Array[String] = table.getSchema.getFieldNames - val dsRows: DataSet[Row] = flinkInterpreter.getJavaBatchTableEnvironment("flink") - .asInstanceOf[BatchTableEnvironment].toDataSet[Row](table) + val btenv = flinkInterpreter.getJavaBatchTableEnvironment("flink") + val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]] showTable(columnNames, dsRows.first(maxResult + 1).collect()) } def showBlinkTable(table: Table): String = { - val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(table.asInstanceOf[TableImpl])).toSeq + val rows = JavaConversions.asScalaBuffer( + flinkInterpreter.getFlinkShims.collectToList(table.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq val columnNames = table.getSchema.getFieldNames showTable(columnNames, rows) } def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = { - val stenv = flinkInterpreter.getStreamTableEnvironment() + val stenv = flinkInterpreter.getJavaStreamTableEnvironment("blink") val context = InterpreterContext.get() configs.foreach(e => context.getLocalProperties.put(e._1, e._2)) val tableName = "UnnamedTable_" + context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement() if (streamType.equalsIgnoreCase("single")) { val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment, - stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism) + table.asInstanceOf[TableImpl].getTableEnvironment, + flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims) streamJob.run(table, tableName) } else if (streamType.equalsIgnoreCase("append")) { val streamJob = new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment, - stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism) + table.asInstanceOf[TableImpl].getTableEnvironment, + flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims) streamJob.run(table, tableName) } else if (streamType.equalsIgnoreCase("update")) { val streamJob = new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment, - stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism) + table.asInstanceOf[TableImpl].getTableEnvironment, + flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism, flinkInterpreter.getFlinkShims) streamJob.run(table, tableName) } else throw new IOException("Unrecognized stream type: " + streamType) diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala similarity index 100% rename from flink/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala rename to flink/interpreter/src/main/scala/org/apache/zeppelin/flink/util/DependencyUtils.scala diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java similarity index 97% rename from flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java index 03efa9b2c6a..b3c3ae498ab 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java @@ -107,14 +107,15 @@ public void testSelect() throws InterpreterException, IOException { " return s.upper()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + context = getInterpreterContext(); result = pyFlinkInterpreter.interpret("bt_env.register_function(\"python_upper\", " + "udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))", - getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertEquals(1, resultMessages.size()); - assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); - assertEquals("add_one\n2\n3\n", resultMessages.get(0).getData()); + context); + assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code()); + // resultMessages = context.out.toInterpreterResultMessage(); + // assertEquals(1, resultMessages.size()); + // assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + // assertEquals("add_one\n2\n3\n", resultMessages.get(0).getData()); // select which use python udf context = getInterpreterContext(); diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java similarity index 100% rename from flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java similarity index 100% rename from flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java similarity index 98% rename from flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java index a2c3b796bcc..a2d380fb7b1 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java @@ -171,7 +171,7 @@ public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter " .field(\"a\", DataTypes.BIGINT())\n" + " .field(\"b\", DataTypes.STRING())\n" + " .field(\"c\", DataTypes.STRING())) \\\n" + - " .register_table_sink(\"batch_sink\")\n" + + " .create_temporary_table(\"batch_sink\")\n" + "t.select(\"a + 1, b, c\").insert_into(\"batch_sink\")\n" + "bt_env.execute(\"batch_job\")" , context); @@ -201,7 +201,7 @@ public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter " .field(\"a\", DataTypes.STRING())\n" + " .field(\"b\", DataTypes.BIGINT())\n" + " .field(\"c\", DataTypes.BIGINT())) \\\n" + - " .register_table_sink(\"batch_sink4\")\n" + + " .create_temporary_table(\"batch_sink4\")\n" + "t.group_by(\"c\").select(\"c, sum(a), count(b)\").insert_into(\"batch_sink4\")\n" + "bt_env.execute(\"batch_job4\")" , context); @@ -242,7 +242,7 @@ public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter " .field(\"a\", DataTypes.BIGINT())\n" + " .field(\"b\", DataTypes.STRING())\n" + " .field(\"c\", DataTypes.STRING())) \\\n" + - " .register_table_sink(\"batch_sink3\")\n" + + " .create_temporary_table(\"batch_sink3\")\n" + "t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" + "bt_env.execute(\"batch_job3\")" , context); @@ -311,11 +311,11 @@ public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkS " .field(\"a\", DataTypes.BIGINT())\n" + " .field(\"b\", DataTypes.STRING())\n" + " .field(\"c\", DataTypes.STRING())) \\\n" + - " .register_table_sink(\"stream_sink\")\n" + + " .create_temporary_table(\"stream_sink\")\n" + "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" + "st_env.execute(\"stream_job\")" , context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); } public static void testSingleStreamTableApi(Interpreter interpreter, diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java similarity index 100% rename from flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java diff --git a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java similarity index 100% rename from flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java rename to flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java diff --git a/flink/src/test/resources/flink-conf.yaml b/flink/interpreter/src/test/resources/flink-conf.yaml similarity index 100% rename from flink/src/test/resources/flink-conf.yaml rename to flink/interpreter/src/test/resources/flink-conf.yaml diff --git a/flink/src/test/resources/init_stream.scala b/flink/interpreter/src/test/resources/init_stream.scala similarity index 100% rename from flink/src/test/resources/init_stream.scala rename to flink/interpreter/src/test/resources/init_stream.scala diff --git a/flink/src/test/resources/log4j.properties b/flink/interpreter/src/test/resources/log4j.properties similarity index 90% rename from flink/src/test/resources/log4j.properties rename to flink/interpreter/src/test/resources/log4j.properties index 24ec9493fa5..23680dfceef 100644 --- a/flink/src/test/resources/log4j.properties +++ b/flink/interpreter/src/test/resources/log4j.properties @@ -21,7 +21,8 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n -log4j.logger.org.apache.hive=WARN -log4j.logger.org.apache.flink=WARN +log4j.logger.org.apache.hive=INFO +log4j.logger.org.apache.flink=INFO log4j.logger.org.apache.zeppelin.flink=DEBUG +log4j.logger.org.apache.zeppelin.python=DEBUG diff --git a/flink/pom.xml b/flink/pom.xml index 69eb026612f..e197e22700b 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -17,798 +17,52 @@ --> - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + + + zeppelin-interpreter-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../zeppelin-interpreter-parent/pom.xml + - - zeppelin-interpreter-parent org.apache.zeppelin + flink-parent + pom 0.9.0-SNAPSHOT - ../zeppelin-interpreter-parent/pom.xml - - - org.apache.zeppelin - zeppelin-flink - jar - 0.9.0-SNAPSHOT - Zeppelin: Flink - Zeppelin flink support - - - - flink - 1.10.0 - 2.6.5 - 2.3.4 - 4.0.0 - 1.15.0 - - 2.0.1 - 2.11 - 2.11.12 - - https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${scala.binary.version}.tgz - - - - - - org.apache.zeppelin - zeppelin-python - ${project.version} - - - io.atomix - * - - - com.google.guava - guava - - - - - - ${project.groupId} - zeppelin-interpreter - ${project.version} - provided - - - io.atomix - * - - - com.google.guava - guava - - - io.grpc - * - - - - - - ${project.groupId} - zeppelin-python - ${project.version} - tests - test - - - io.atomix - * - - - com.google.guava - guava - - - io.grpc - * - - - - - - org.jline - jline-terminal - 3.9.0 - - - - org.jline - jline-reader - 3.9.0 - - - - org.apache.flink - flink-python_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-core - ${flink.version} - provided - - - - org.apache.flink - flink-clients_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-runtime_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-yarn_${scala.binary.version} - ${flink.version} - provided - - - org.apache.flink - flink-shaded-hadoop2 - - - org.eclipse.jetty - * - - - io.netty - netty - - - - - - org.apache.flink - flink-table-api-scala_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-scala-bridge_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-scala_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-scala-shell_${scala.binary.version} - ${flink.version} - - - - org.apache.flink - flink-streaming-java_2.11 - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-scala_2.11 - ${flink.version} - provided - - - - org.apache.flink - flink-java - ${flink.version} - provided - - - - org.apache.flink - flink-connector-hive_2.11 - ${flink.version} - provided - - - - org.apache.flink - flink-connector-hive_2.11 - ${flink.version} - tests - test - - - - - org.apache.flink - flink-hadoop-compatibility_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.ivy - ivy - 2.4.0 - - - - oro - - oro - 2.0.8 - - - - - com.google.code.gson - gson - - - - org.scala-lang - scala-library - ${scala.version} - - - - org.scala-lang - scala-compiler - ${scala.version} - + Zeppelin: Flink Parent + Zeppelin Flink Support - - org.scala-lang - scala-reflect - ${scala.version} - + + interpreter + flink-shims + flink1.10-shims + flink1.11-shims + - - com.mashape.unirest - unirest-java - 1.4.9 - + - - org.apache.flink - flink-connector-hive_${scala.binary.version} - ${flink.version} - provided - - - org.apache.hive - hive-metastore - - - org.apache.hive - hive-exec - - - - - org.apache.flink - flink-connector-hive_${scala.binary.version} - ${flink.version} - tests - test - - - org.apache.hive - hive-exec - - - org.apache.hive - hive-metastore - - - - - - org.apache.flink - flink-table-planner-blink_2.11 - ${flink.version} - provided - - - org.reflections - reflections - - - - - - org.apache.flink - flink-table-planner_2.11 - ${flink.version} - provided - - - - org.mockito - mockito-core - test - - - - org.powermock - powermock-api-mockito - test - - - - org.powermock - powermock-module-junit4 - test - - - - org.apache.hive - hive-metastore - ${hive.version} - provided - - - hadoop-auth - org.apache.hadoop - - - com.google.guava - guava - - - io.netty - netty - - - io.netty - netty-all - - - com.google.protobuf - protobuf-java - - - - - - org.apache.hive - hive-exec - ${hive.version} - provided - - - org.apache.calcite - calcite-core - - - org.apache.calcite - calcite-druid - - - org.apache.calcite.avatica - avatica - - - commons-codec - commons-codec - - - commons-httpclient - commons-httpclient - - - commons-io - commons-io - - - org.apache.logging.log4j - log4j-1.2-api - - - org.apache.logging.log4j - log4j-slf4j-impl - - - org.slf4j - slf4j-api - - - org.apache.zookeeper - zookeeper - - - org.apache.curator - curator-framework - - - org.apache.curator - apache-curator - - - com.google.code.gson - gson - - - jline - jline - - - com.google.guava - guava - - - io.netty - netty - - - io.netty - netty-all - - - com.google.protobuf - protobuf-java - - - - - - org.apache.hive.hcatalog - hive-webhcat-java-client - ${hive.version} - test - - - org.apache.calcite - * - - - com.google.guava - guava - - - io.netty - netty - - - javax.jms - jms - - - - - - org.apache.hive - hive-contrib - ${hive.version} - test - - - - org.apache.hive.hcatalog - hive-hcatalog-core - ${hive.version} - test - test-jar - - - jline - jline - - - com.google.guava - guava - - - io.netty - netty - - - io.netty - netty-all - - - - - - net.jodah - concurrentunit - 0.4.4 - test - - - - com.klarna - hiverunner - 4.0.0 - test - - - com.google.guava - guava - - - - - - net.jodah - concurrentunit - 0.4.4 - test - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - eclipse-add-source - - add-source - - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile-first - process-test-resources - - testCompile - - - - - ${scala.version} - - - - -unchecked - -deprecation - -feature - -target:jvm-1.8 - - - -Xms1024m - -Xmx1024m - -XX:MaxMetaspaceSize=${MaxMetaspace} - - - -source - ${java.version} - -target - ${java.version} - -Xlint:all,-serial,-path,-options - - - - - - - com.googlecode.maven-download-plugin - download-maven-plugin - - - download-flink-files - validate - - wget - - - 60000 - 5 - ${flink.bin.download.url} - true - ${project.build.directory} - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - always - 1 - false - - -Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true - - - ${project.build.directory}/flink-${flink.version} - ${project.build.directory}/test-classes - - - - - - - org.apache.maven.plugins - maven-eclipse-plugin - - true - - org.scala-ide.sdt.core.scalanature - org.eclipse.jdt.core.javanature - - - org.scala-ide.sdt.core.scalabuilder - - - org.scala-ide.sdt.launching.SCALA_CONTAINER - org.eclipse.jdt.launching.JRE_CONTAINER - - - **/*.scala - **/*.java - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - - add-source - generate-sources - - add-source - - - - src/main/scala - - - - - - add-test-source - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - - - - maven-enforcer-plugin - - - maven-dependency-plugin - - - maven-resources-plugin - - - org.apache.maven.plugins - maven-shade-plugin - - - - *:* - - org/datanucleus/** - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - org.scala-lang:scala-library - org.scala-lang:scala-compiler - org.scala-lang:scala-reflect - org.apache.flink:* - - - - - - - reference.conf - - - - - io.netty - org.apache.zeppelin.shaded.io.netty - - - com.google - org.apache.zeppelin.shaded.com.google - - - ${project.basedir}/../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar - - - - package - - shade - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - true - - - - - - - + + org.slf4j + slf4j-api + - - hive2 - - true - - - 2.3.4 - 4.0.0 - - + + org.slf4j + slf4j-log4j12 + - - hive1 - - 1.2.1 - 3.2.1 - - - org.apache.hadoop - hadoop-common - 2.7.5 - provided + log4j + log4j - - - + + junit + junit + test + + - + \ No newline at end of file diff --git a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java deleted file mode 100644 index 6720bf2aef5..00000000000 --- a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.module.ModuleManager; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.util.Map; - -/** - * Factory class for creating flink table env for different purpose: - * 1. java/scala - * 2. stream table / batch table - * 3. flink planner / blink planner - * - */ -public class TableEnvFactory { - - private Executor executor; - private org.apache.flink.api.scala.ExecutionEnvironment benv; - private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv; - private TableConfig tblConfig; - private CatalogManager catalogManager; - private ModuleManager moduleManager; - private FunctionCatalog flinkFunctionCatalog; - private FunctionCatalog blinkFunctionCatalog; - - public TableEnvFactory(org.apache.flink.api.scala.ExecutionEnvironment env, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv, - TableConfig tblConfig, - CatalogManager catalogManager, - ModuleManager moduleManager, - FunctionCatalog flinkFunctionCatalog, - FunctionCatalog blinkFunctionCatalog) { - this.benv = env; - this.senv = senv; - this.tblConfig = tblConfig; - this.catalogManager = catalogManager; - this.moduleManager = moduleManager; - this.flinkFunctionCatalog = flinkFunctionCatalog; - this.blinkFunctionCatalog = blinkFunctionCatalog; - } - - public org.apache.flink.table.api.scala.BatchTableEnvironment createScalaFlinkBatchTableEnvironment() { - try { - Class clazz = Class - .forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl"); - Constructor constructor = clazz - .getConstructor( - org.apache.flink.api.scala.ExecutionEnvironment.class, - TableConfig.class, - CatalogManager.class, - ModuleManager.class); - - return (org.apache.flink.table.api.scala.BatchTableEnvironment) - constructor.newInstance(benv, tblConfig, catalogManager, moduleManager); - } catch (Exception e) { - throw new TableException("Fail to createScalaFlinkBatchTableEnvironment", e); - } - } - - public org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl - createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) { - - Map executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create( - plannerProperties, - executor, - tblConfig, - flinkFunctionCatalog, - catalogManager); - - return new org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl( - catalogManager, - moduleManager, - flinkFunctionCatalog, - tblConfig, - senv, - planner, - executor, - settings.isStreamingMode() - ); - } - - public org.apache.flink.table.api.java.BatchTableEnvironment createJavaFlinkBatchTableEnvironment() { - try { - Class clazz = - Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl"); - Constructor con = clazz.getConstructor( - ExecutionEnvironment.class, TableConfig.class, CatalogManager.class, ModuleManager.class); - return (org.apache.flink.table.api.java.BatchTableEnvironment) con.newInstance( - benv.getJavaEnv(), tblConfig, catalogManager, moduleManager); - } catch (Throwable t) { - throw new TableException("Create BatchTableEnvironment failed.", t); - } - } - - public StreamTableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) { - - if (!settings.isStreamingMode()) { - throw new TableException( - "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."); - } - - Map executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, tblConfig, flinkFunctionCatalog, catalogManager); - - return new StreamTableEnvironmentImpl( - catalogManager, - moduleManager, - flinkFunctionCatalog, - tblConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode() - ); - } - - public org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl - createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) { - - Map executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create( - plannerProperties, - executor, - tblConfig, - blinkFunctionCatalog, - catalogManager); - - return new org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl( - catalogManager, - moduleManager, - blinkFunctionCatalog, - tblConfig, - senv, - planner, - executor, - settings.isStreamingMode()); - } - - public void createPlanner(EnvironmentSettings settings) { - Map executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map plannerProperties = settings.toPlannerProperties(); - ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create( - plannerProperties, - executor, - tblConfig, - blinkFunctionCatalog, - catalogManager); - } - - public StreamTableEnvironment createJavaBlinkStreamTableEnvironment( - EnvironmentSettings settings) { - - if (!settings.isStreamingMode()) { - throw new TableException( - "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."); - } - - Map executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager); - - return new StreamTableEnvironmentImpl( - catalogManager, - moduleManager, - blinkFunctionCatalog, - tblConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode() - ); - } - - public TableEnvironment createJavaBlinkBatchTableEnvironment( - EnvironmentSettings settings) { - final Map executorProperties = settings.toExecutorProperties(); - executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - final Map plannerProperties = settings.toPlannerProperties(); - final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager); - - return new StreamTableEnvironmentImpl( - catalogManager, - moduleManager, - blinkFunctionCatalog, - tblConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); - } - - private static Executor lookupExecutor( - Map executorProperties, - StreamExecutionEnvironment executionEnvironment) { - try { - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); - - return (Executor) createMethod.invoke( - executorFactory, - executorProperties, - executionEnvironment); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } -}