Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public abstract Object getCollectStreamTableSink(InetAddress targetAddress,

public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;

public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception;
public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception;

public abstract boolean rowEquals(Object row1, Object row2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public void addInsertStatement(String sql, Object tblEnv, InterpreterContext con
}

@Override
public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
((TableEnvironment) tblEnv).execute(sql);
public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
((TableEnvironment) tblEnv).execute(jobName);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void addInsertStatement(String sql, Object tblEnv, InterpreterContext con
}

@Override
public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
while(!jobClient.getJobStatus().get().isTerminalState()) {
LOGGER.debug("Wait for job to finish");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ private InterpreterResult runSqlList(String st, InterpreterContext context) {
if (runAsOne) {
try {
lock.lock();
if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) {
String jobName = context.getStringLocalProperty("jobName", st);
if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) {
context.out.write("Insertion successfully.\n");
}
} catch (Exception e) {
Expand Down Expand Up @@ -532,7 +533,8 @@ public void callInsertInto(String sql,
boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
if (!runAsOne) {
this.tbenv.sqlUpdate(sql);
this.tbenv.execute(sql);
String jobName = context.getStringLocalProperty("jobName", sql);
this.tbenv.execute(jobName);
context.out.write("Insertion successfully.\n");
} else {
flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ public String run(Table table, String tableName) throws IOException {
retrievalThread.start();

LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism);
stenv.execute(tableName);
LOGGER.info("Flink Job is finished, tableName: " + tableName);
String jobName = context.getStringLocalProperty("jobName", tableName);
stenv.execute(jobName);
LOGGER.info("Flink Job is finished, jobName: " + jobName);
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
retrievalThread.join();
Expand Down