From 7c6e2f5c9e4ae9576dc6a55b39277027d876389c Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sat, 6 Jun 2020 23:35:26 +0800 Subject: [PATCH] [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties --- .../src/main/java/org/apache/zeppelin/flink/FlinkShims.java | 2 +- .../main/java/org/apache/zeppelin/flink/Flink110Shims.java | 4 ++-- .../main/java/org/apache/zeppelin/flink/Flink111Shims.java | 2 +- .../java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 6 ++++-- .../org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java | 5 +++-- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index ef5f0a0af06..274bf2cc261 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -89,7 +89,7 @@ public abstract Object getCollectStreamTableSink(InetAddress targetAddress, public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception; - public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception; + public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception; public abstract boolean rowEquals(Object row1, Object row2); diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index dec35600c51..f6d506a6c00 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -109,8 +109,8 @@ public void addInsertStatement(String sql, Object tblEnv, InterpreterContext con } @Override - public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception { - ((TableEnvironment) tblEnv).execute(sql); + public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { + ((TableEnvironment) tblEnv).execute(jobName); return true; } diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index ea11cede8b9..2480c69173b 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -116,7 +116,7 @@ public void addInsertStatement(String sql, Object tblEnv, InterpreterContext con } @Override - public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception { + public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get(); while(!jobClient.getJobStatus().get().isTerminalState()) { LOGGER.debug("Wait for job to finish"); diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index 1e8e8033ffe..f2d31b60e90 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -223,7 +223,8 @@ private InterpreterResult runSqlList(String st, InterpreterContext context) { if (runAsOne) { try { lock.lock(); - if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) { + String jobName = context.getStringLocalProperty("jobName", st); + if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) { context.out.write("Insertion successfully.\n"); } } catch (Exception e) { @@ -532,7 +533,8 @@ public void callInsertInto(String sql, boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); if (!runAsOne) { this.tbenv.sqlUpdate(sql); - this.tbenv.execute(sql); + String jobName = context.getStringLocalProperty("jobName", sql); + this.tbenv.execute(jobName); context.out.write("Insertion successfully.\n"); } else { flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context); diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 2d98ef7a059..a8728d3d70f 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -157,8 +157,9 @@ public String run(Table table, String tableName) throws IOException { retrievalThread.start(); LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism); - stenv.execute(tableName); - LOGGER.info("Flink Job is finished, tableName: " + tableName); + String jobName = context.getStringLocalProperty("jobName", tableName); + stenv.execute(jobName); + LOGGER.info("Flink Job is finished, jobName: " + jobName); // wait for retrieve thread consume all data LOGGER.info("Waiting for retrieve thread to be done"); retrievalThread.join();