Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
import org.apache.zeppelin.interpreter.Interpreter;
Expand All @@ -42,15 +46,14 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class FlinkSqlInterrpeter extends Interpreter {
Expand All @@ -69,6 +72,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
.append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
.append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
Expand All @@ -86,7 +90,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
private SqlSplitter sqlSplitter;
private int defaultSqlParallelism;
private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();

// all the available sql config options. see
// https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
private Map<String, ConfigOption> tableConfigOptions;
// represent the current paragraph's configOptions
private Map<String, String> currentConfigOptions = new HashMap<>();

public FlinkSqlInterrpeter(Properties properties) {
super(properties);
Expand Down Expand Up @@ -117,6 +125,31 @@ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Null
flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
this.tableConfigOptions = extractTableConfigOptions();
}

private Map<String, ConfigOption> extractTableConfigOptions() {
Map<String, ConfigOption> configOptions = new HashMap<>();
configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
configOptions.putAll(extractConfigOptions(PythonOptions.class));
return configOptions;
}

private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
Map<String, ConfigOption> configOptions = new HashMap();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.getType().isAssignableFrom(ConfigOption.class)) {
try {
ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
configOptions.put(configOption.key(), configOption);
} catch (Throwable e) {
LOGGER.warn("Fail to get ConfigOption", e);
}
}
}
return configOptions;
}

@Override
Expand All @@ -139,6 +172,7 @@ public InterpreterResult interpret(String st,
}

private InterpreterResult runSqlList(String st, InterpreterContext context) {
currentConfigOptions.clear();
List<String> sqls = sqlSplitter.splitSql(st);
for (String sql : sqls) {
Optional<SqlCommandParser.SqlCommandCall> sqlCommand = SqlCommandParser.parse(sql);
Expand Down Expand Up @@ -210,6 +244,9 @@ private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
case SELECT:
callSelect(cmdCall.operands[0], context);
break;
case SET:
callSet(cmdCall.operands[0], cmdCall.operands[1], context);
break;
case INSERT_INTO:
case INSERT_OVERWRITE:
callInsertInto(cmdCall.operands[0], context);
Expand Down Expand Up @@ -401,25 +438,47 @@ private void callExplain(String sql, InterpreterContext context) throws IOExcept
public void callSelect(String sql, InterpreterContext context) throws IOException {
try {
lock.lock();
// set parallelism from paragraph local property
if (context.getLocalProperties().containsKey("parallelism")) {
this.tbenv.getConfig().getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
Integer.parseInt(context.getLocalProperties().get("parallelism")));
}
callInnerSelect(sql, context);

// set table config from set statement until now.
for (Map.Entry<String, String> entry : currentConfigOptions.entrySet()) {
this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}
callInnerSelect(sql, context);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
// reset parallelism
this.tbenv.getConfig().getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
defaultSqlParallelism);
// reset table config
for (ConfigOption configOption: tableConfigOptions.values()) {
// some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
if (configOption.defaultValue() != null) {
this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
}
}
this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
}
}

public abstract void callInnerSelect(String sql, InterpreterContext context) throws IOException;

public void callSet(String key, String value, InterpreterContext context) throws IOException {
if (!tableConfigOptions.containsKey(key)) {
throw new IOException(key + " is not a valid table/sql config, please check link: " +
"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html");
}
currentConfigOptions.put(key, value);
}

private void callInsertInto(String sql,
InterpreterContext context) throws IOException {
if (!isBatch()) {
Expand All @@ -432,6 +491,12 @@ private void callInsertInto(String sql,
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
Integer.parseInt(context.getLocalProperties().get("parallelism")));
}

// set table config from set statement until now.
for (Map.Entry<String, String> entry : currentConfigOptions.entrySet()) {
this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), entry.getValue());
}

this.tbenv.sqlUpdate(sql);
this.tbenv.execute(sql);
} catch (Exception e) {
Expand All @@ -440,9 +505,19 @@ private void callInsertInto(String sql,
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}

// reset parallelism
this.tbenv.getConfig().getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
defaultSqlParallelism);
// reset table config
for (ConfigOption configOption: tableConfigOptions.values()) {
// some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
if (configOption.defaultValue() != null) {
this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
}
}
this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
}
context.out.write("Insertion successfully.\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@


import org.apache.commons.io.FileUtils;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
Expand Down Expand Up @@ -237,4 +239,61 @@ public void testInsertInto() throws InterpreterException, IOException {
// resultMessages = context.out.toInterpreterResultMessage();
// assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
}

@Test
public void testSetTableConfig() throws InterpreterException, IOException {
hiveShell.execute("create table source_table (id int, name string)");
hiveShell.execute("insert into source_table values(1, 'a'), (2, 'b')");

File destDir = Files.createTempDirectory("flink_test").toFile();
FileUtils.deleteDirectory(destDir);
InterpreterResult result = sqlInterpreter.interpret(
"CREATE TABLE sink_table (\n" +
"id int,\n" +
"name string" +
") WITH (\n" +
"'format.field-delimiter'=',',\n" +
"'connector.type'='filesystem',\n" +
"'format.derive-schema'='true',\n" +
"'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
"'format.type'='csv'\n" +
");", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());

// set parallelism then insert into
InterpreterContext context = getInterpreterContext();
result = sqlInterpreter.interpret(
"set table.exec.resource.default-parallelism=10;" +
"insert into sink_table select * from source_table", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));

// set then insert into
destDir.delete();
context = getInterpreterContext();
result = sqlInterpreter.interpret(
"set table.optimizer.source.predicate-pushdown-enabled=false;" +
"insert into sink_table select * from source_table", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
assertEquals(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED.defaultValue(),
sqlInterpreter.tbenv.getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED));

// invalid config
destDir.delete();
context = getInterpreterContext();
result = sqlInterpreter.interpret(
"set table.invalid_config=false;" +
"insert into sink_table select * from source_table", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertTrue(resultMessages.get(0).getData(),
resultMessages.get(0).getData().contains("table.invalid_config is not a valid table/sql config"));
}
}