From 9691eae820f1762f900553abd0865c008319163e Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 10 Jun 2020 22:35:08 +0800 Subject: [PATCH 1/3] [ZEPPELIN-4873]. Display rich duration info for insert into flink job --- .../org/apache/zeppelin/flink/JobManager.java | 56 ++++++++++++++----- .../apache/zeppelin/flink/JobManagerTest.java | 43 ++++++++++++++ 2 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index 471ae75ad53..db9f1e8acfd 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class JobManager { @@ -110,7 +111,7 @@ public int getJobProgress(String paragraphId) { } public void cancelJob(InterpreterContext context) throws InterpreterException { - LOGGER.info("Canceling job associated of paragraph: "+ context.getParagraphId()); + LOGGER.info("Canceling job associated of paragraph: " + context.getParagraphId()); JobClient jobClient = this.jobs.get(context.getParagraphId()); if (jobClient == null) { LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph", @@ -172,8 +173,12 @@ class FlinkJobProgressPoller extends Thread { @Override public void run() { while (!Thread.currentThread().isInterrupted() && running.get()) { + JsonNode rootNode = null; try { + synchronized (running) { + running.wait(1000); + } rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString()) .asJson().getBody(); JSONArray vertices = rootNode.getObject().getJSONArray("vertices"); @@ -194,13 +199,12 @@ public void run() { if (jobState.equalsIgnoreCase("finished")) { break; } - synchronized (running) { - running.wait(1000); - } + long duration = rootNode.getObject().getLong("duration") / 1000; + if (isStreamingInsertInto) { if (isFirstPoll) { StringBuilder builder = new StringBuilder("%angular "); - builder.append("

Duration: {{duration}} seconds"); + builder.append("

Duration: {{duration}}

"); builder.append("\n%text "); context.out.clear(false); context.out.write(builder.toString()); @@ -208,7 +212,7 @@ public void run() { isFirstPoll = false; } context.getAngularObjectRegistry().add("duration", - rootNode.getObject().getLong("duration") / 1000, + toRichTimeDuration(duration), context.getNoteId(), context.getParagraphId()); } @@ -218,15 +222,39 @@ public void run() { } } - public void cancel () { - this.running.set(false); - synchronized (running) { - running.notify(); - } + public void cancel() { + this.running.set(false); + synchronized (running) { + running.notify(); } + } - public int getProgress () { - return progress; - } + public int getProgress() { + return progress; } } + + static String toRichTimeDuration(long duration) { + long days = TimeUnit.SECONDS.toDays(duration); + duration -= TimeUnit.DAYS.toSeconds(days); + long hours = TimeUnit.SECONDS.toHours(duration); + duration -= TimeUnit.HOURS.toSeconds(hours); + long minutes = TimeUnit.SECONDS.toMinutes(duration); + duration -= TimeUnit.MINUTES.toSeconds(minutes); + long seconds = TimeUnit.SECONDS.toSeconds(duration); + + StringBuilder builder = new StringBuilder(); + if (days != 0) { + builder.append(days + " days "); + } + if (days != 0 || hours != 0) { + builder.append(hours + " hours "); + } + if (days != 0 || hours != 0 || minutes != 0) { + builder.append(minutes + " minutes "); + } + builder.append(seconds + " seconds"); + return builder.toString(); + } + +} diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java new file mode 100644 index 00000000000..fe196e93be1 --- /dev/null +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JobManagerTest { + + @Test + public void testRichDuration() { + String richDuration = JobManager.toRichTimeDuration(18); + assertEquals("18 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(120); + assertEquals("2 minutes 0 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(60 * 60 + 1); + assertEquals("1 hours 0 minutes 1 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(60 * 60 + 60 + 1); + assertEquals("1 hours 1 minutes 1 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(24 * 60 * 60 + 60 + 1); + assertEquals("1 days 0 hours 1 minutes 1 seconds", richDuration); + } +} From 419818e4a19d27623b1c0ad9346f6f7a75f37214 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 12 Jun 2020 17:08:58 +0800 Subject: [PATCH 2/3] address comment --- .../src/main/java/org/apache/zeppelin/flink/JobManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index db9f1e8acfd..e8ed05758e5 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -111,7 +111,7 @@ public int getJobProgress(String paragraphId) { } public void cancelJob(InterpreterContext context) throws InterpreterException { - LOGGER.info("Canceling job associated of paragraph: " + context.getParagraphId()); + LOGGER.info("Canceling job associated of paragraph: {}", context.getParagraphId()); JobClient jobClient = this.jobs.get(context.getParagraphId()); if (jobClient == null) { LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph", From 459f181c52e0194e7b639cbcfbdcad2c016e46bd Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 15 Jun 2020 15:39:16 +0800 Subject: [PATCH 3/3] add java doc --- .../main/java/org/apache/zeppelin/flink/JobManager.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index e8ed05758e5..325665f6362 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -234,6 +235,12 @@ public int getProgress() { } } + /** + * Convert duration in seconds to rich time duration format. e.g. 2 days 3 hours 4 minutes 5 seconds + * + * @param duration in second + * @return + */ static String toRichTimeDuration(long duration) { long days = TimeUnit.SECONDS.toDays(duration); duration -= TimeUnit.DAYS.toSeconds(days);