diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java index 235676d0b28..471ae75ad53 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -160,6 +160,7 @@ class FlinkJobProgressPoller extends Thread { private boolean isStreamingInsertInto; private int progress; private AtomicBoolean running = new AtomicBoolean(true); + private boolean isFirstPoll = true; FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) { this.flinkWebUI = flinkWebUI; @@ -197,15 +198,19 @@ public void run() { running.wait(1000); } if (isStreamingInsertInto) { - StringBuilder builder = new StringBuilder("%html "); - builder.append("

Duration: " + - rootNode.getObject().getLong("duration") / 1000 + - " seconds"); - builder.append("\n%text "); - context.out.clear(false); - sendFlinkJobUrl(context); - context.out.write(builder.toString()); - context.out.flush(); + if (isFirstPoll) { + StringBuilder builder = new StringBuilder("%angular "); + builder.append("

Duration: {{duration}} seconds"); + builder.append("\n%text "); + context.out.clear(false); + context.out.write(builder.toString()); + context.out.flush(); + isFirstPoll = false; + } + context.getAngularObjectRegistry().add("duration", + rootNode.getObject().getLong("duration") / 1000, + context.getNoteId(), + context.getParagraphId()); } } catch (Exception e) { LOGGER.error("Fail to poll flink job progress via rest api, rest api: " + rootNode, e); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 20bad294ea6..3c3125f35b3 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -20,9 +20,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.flink.types.Row; -import org.apache.flink.util.StringUtils; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.tabledata.TableDataUtils; @@ -36,6 +34,7 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { private Row latestRow; private String template; + private boolean isFirstRefresh = true; public SingleRowStreamSqlJob(StreamExecutionEnvironment senv, TableEnvironment stenv, @@ -64,11 +63,10 @@ protected void processDelete(Row row) { @Override protected String buildResult() { StringBuilder builder = new StringBuilder(); - builder.append("%html\n"); + builder.append("%angular "); String outputText = template; for (int i = 0; i < latestRow.getArity(); ++i) { - outputText = outputText.replace("{" + i + "}", - TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(latestRow.getField(i)))); + outputText = outputText.replace("{" + i + "}", "{{value_" + i + "}}"); } builder.append(outputText); return builder.toString(); @@ -80,10 +78,19 @@ protected void refresh(InterpreterContext context) throws Exception { LOGGER.warn("Skip RefreshTask as no data available"); return; } - context.out().clear(false); - String output = buildResult(); - context.out.write(output); - LOGGER.debug("Refresh Output: " + output); - context.out.flush(); + if (isFirstRefresh) { + context.out().clear(false); + String output = buildResult(); + context.out.write(output); + context.out.flush(); + isFirstRefresh = false; + } + + for (int i = 0; i < latestRow.getArity(); ++i) { + context.getAngularObjectRegistry().add("value_" + i, + TableDataUtils.normalizeColumn(latestRow.getField(i)), + context.getNoteId(), + context.getParagraphId()); + } } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2cea70c87b1..dda54137f36 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -161,6 +161,7 @@ public class JDBCInterpreter extends KerberosInterpreter { private SqlSplitter sqlSplitter; private Map refreshExecutorServices = new HashMap<>(); + private Map isFirstRefreshMap = new HashMap<>(); private Map paragraphCancelMap = new HashMap<>(); public JDBCInterpreter(Properties property) { @@ -577,32 +578,10 @@ && isNotEmpty(properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY))) { return null; } - private String getResults(ResultSet resultSet, - boolean isTableType, - String template) + private String getResults(ResultSet resultSet, boolean isTableType) throws SQLException { ResultSetMetaData md = resultSet.getMetaData(); - - /** - * If html template is provided, only fetch the first row. - */ - if (template != null) { - resultSet.next(); - String result = "%html " + template + "\n"; - for (int i = 1; i <= md.getColumnCount(); ++i) { - Object columnObject = resultSet.getObject(i); - String columnValue = null; - if (columnObject == null) { - columnValue = "null"; - } else { - columnValue = resultSet.getString(i); - } - result = result.replace("{" + (i - 1) + "}", columnValue); - } - return result; - } - StringBuilder msg; if (isTableType) { msg = new StringBuilder(TABLE_MAGIC_TAG); @@ -759,11 +738,38 @@ private InterpreterResult executeSql(String propertyKey, String sql, resultSet.getMetaData().getColumnCount())) { context.out.write("%text Query executed successfully.\n"); } else { - String results = getResults(resultSet, - !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE), - context.getLocalProperties().get("template")); - context.out.write(results); - context.out.write("\n%text "); + String template = context.getLocalProperties().get("template"); + if (!StringUtils.isBlank(template)) { + resultSet.next(); + ResultSetMetaData md = resultSet.getMetaData(); + if (isFirstRefreshMap.get(context.getParagraphId())) { + String angularTemplate = template; + for (int j = 0; j < md.getColumnCount(); ++j) { + angularTemplate = angularTemplate.replace("{" + j + "}", "{{value_" + j + "}}"); + } + context.out.write("%angular " + angularTemplate); + context.out.write("\n%text "); + context.out.flush(); + isFirstRefreshMap.put(context.getParagraphId(), false); + } + for (int j = 1; j <= md.getColumnCount(); ++j) { + Object columnObject = resultSet.getObject(j); + String columnValue = null; + if (columnObject == null) { + columnValue = "null"; + } else { + columnValue = resultSet.getString(j); + } + context.getAngularObjectRegistry().add("value_" + (j - 1), + columnValue, context.getNoteId(), context.getParagraphId()); + } + } else { + String results = getResults(resultSet, + !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE)); + context.out.write(results); + context.out.write("\n%text "); + context.out.flush(); + } } } else { // Response contains either an update count or there are no results. @@ -851,6 +857,7 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex paragraphCancelMap.put(context.getParagraphId(), false); ScheduledExecutorService refreshExecutor = Executors.newSingleThreadScheduledExecutor(); refreshExecutorServices.put(context.getParagraphId(), refreshExecutor); + isFirstRefreshMap.put(context.getParagraphId(), true); final AtomicReference interpreterResultRef = new AtomicReference(); refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false);