diff --git a/flink/pom.xml b/flink/pom.xml
index 9be4fe51f94..21ec1826554 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -492,6 +492,10 @@
io.netty
netty
+
+ javax.jms
+ jms
+
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index d794b4be019..82ac50e1a14 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -22,12 +22,16 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.PythonOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
import org.apache.zeppelin.interpreter.Interpreter;
@@ -42,15 +46,14 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class FlinkSqlInterrpeter extends Interpreter {
@@ -69,6 +72,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
.append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+ .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET =;'. Use 'SET;' for listing all properties."))
.append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
@@ -86,7 +90,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
private SqlSplitter sqlSplitter;
private int defaultSqlParallelism;
private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
-
+ // all the available sql config options. see
+ // https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
+ private Map tableConfigOptions;
+ // represent the current paragraph's configOptions
+ private Map currentConfigOptions = new HashMap<>();
public FlinkSqlInterrpeter(Properties properties) {
super(properties);
@@ -117,6 +125,31 @@ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Null
flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
+ this.tableConfigOptions = extractTableConfigOptions();
+ }
+
+ private Map extractTableConfigOptions() {
+ Map configOptions = new HashMap<>();
+ configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
+ configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
+ configOptions.putAll(extractConfigOptions(PythonOptions.class));
+ return configOptions;
+ }
+
+ private Map extractConfigOptions(Class clazz) {
+ Map configOptions = new HashMap();
+ Field[] fields = clazz.getDeclaredFields();
+ for (Field field : fields) {
+ if (field.getType().isAssignableFrom(ConfigOption.class)) {
+ try {
+ ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
+ configOptions.put(configOption.key(), configOption);
+ } catch (Throwable e) {
+ LOGGER.warn("Fail to get ConfigOption", e);
+ }
+ }
+ }
+ return configOptions;
}
@Override
@@ -139,6 +172,7 @@ public InterpreterResult interpret(String st,
}
private InterpreterResult runSqlList(String st, InterpreterContext context) {
+ currentConfigOptions.clear();
List sqls = sqlSplitter.splitSql(st);
for (String sql : sqls) {
Optional sqlCommand = SqlCommandParser.parse(sql);
@@ -210,6 +244,9 @@ private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
case SELECT:
callSelect(cmdCall.operands[0], context);
break;
+ case SET:
+ callSet(cmdCall.operands[0], cmdCall.operands[1], context);
+ break;
case INSERT_INTO:
case INSERT_OVERWRITE:
callInsertInto(cmdCall.operands[0], context);
@@ -401,25 +438,47 @@ private void callExplain(String sql, InterpreterContext context) throws IOExcept
public void callSelect(String sql, InterpreterContext context) throws IOException {
try {
lock.lock();
+ // set parallelism from paragraph local property
if (context.getLocalProperties().containsKey("parallelism")) {
this.tbenv.getConfig().getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
Integer.parseInt(context.getLocalProperties().get("parallelism")));
}
- callInnerSelect(sql, context);
+ // set table config from set statement until now.
+ for (Map.Entry entry : currentConfigOptions.entrySet()) {
+ this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
+ }
+ callInnerSelect(sql, context);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
+ // reset parallelism
this.tbenv.getConfig().getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
defaultSqlParallelism);
+ // reset table config
+ for (ConfigOption configOption: tableConfigOptions.values()) {
+ // some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
+ if (configOption.defaultValue() != null) {
+ this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
+ }
+ }
+ this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
}
}
public abstract void callInnerSelect(String sql, InterpreterContext context) throws IOException;
+ public void callSet(String key, String value, InterpreterContext context) throws IOException {
+ if (!tableConfigOptions.containsKey(key)) {
+ throw new IOException(key + " is not a valid table/sql config, please check link: " +
+ "https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html");
+ }
+ currentConfigOptions.put(key, value);
+ }
+
private void callInsertInto(String sql,
InterpreterContext context) throws IOException {
if (!isBatch()) {
@@ -432,6 +491,12 @@ private void callInsertInto(String sql,
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
Integer.parseInt(context.getLocalProperties().get("parallelism")));
}
+
+ // set table config from set statement until now.
+ for (Map.Entry entry : currentConfigOptions.entrySet()) {
+ this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
+ }
+
this.tbenv.sqlUpdate(sql);
this.tbenv.execute(sql);
} catch (Exception e) {
@@ -440,9 +505,19 @@ private void callInsertInto(String sql,
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
+
+ // reset parallelism
this.tbenv.getConfig().getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
defaultSqlParallelism);
+ // reset table config
+ for (ConfigOption configOption: tableConfigOptions.values()) {
+ // some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
+ if (configOption.defaultValue() != null) {
+ this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
+ }
+ }
+ this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
}
context.out.write("Insertion successfully.\n");
}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 3fe35c514ea..c75d7fe40f3 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -20,6 +20,8 @@
import org.apache.commons.io.FileUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -237,4 +239,61 @@ public void testInsertInto() throws InterpreterException, IOException {
// resultMessages = context.out.toInterpreterResultMessage();
// assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
}
+
+ @Test
+ public void testSetTableConfig() throws InterpreterException, IOException {
+ hiveShell.execute("create table source_table (id int, name string)");
+ hiveShell.execute("insert into source_table values(1, 'a'), (2, 'b')");
+
+ File destDir = Files.createTempDirectory("flink_test").toFile();
+ FileUtils.deleteDirectory(destDir);
+ InterpreterResult result = sqlInterpreter.interpret(
+ "CREATE TABLE sink_table (\n" +
+ "id int,\n" +
+ "name string" +
+ ") WITH (\n" +
+ "'format.field-delimiter'=',',\n" +
+ "'connector.type'='filesystem',\n" +
+ "'format.derive-schema'='true',\n" +
+ "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+ "'format.type'='csv'\n" +
+ ");", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // set parallelism then insert into
+ InterpreterContext context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "set table.exec.resource.default-parallelism=10;" +
+ "insert into sink_table select * from source_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ List resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+ assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
+ sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+
+ // set then insert into
+ destDir.delete();
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "set table.optimizer.source.predicate-pushdown-enabled=false;" +
+ "insert into sink_table select * from source_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+ assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
+ sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+ assertEquals(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED.defaultValue(),
+ sqlInterpreter.tbenv.getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED));
+
+ // invalid config
+ destDir.delete();
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "set table.invalid_config=false;" +
+ "insert into sink_table select * from source_table", context);
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertTrue(resultMessages.get(0).getData(),
+ resultMessages.get(0).getData().contains("table.invalid_config is not a valid table/sql config"));
+ }
}