Skip to content

Commit a531fe1

Browse files
vijoshiTom Graves
authored andcommitted
[SPARK-17843][WEB UI] Indicate event logs pending for processing on history server UI
## What changes were proposed in this pull request? History Server UI's application listing to display information on currently under process event logs so a user knows that pending this processing an application may not list on the UI. When there are no event logs under process, the application list page has a "Last Updated" date-time at the top indicating the date-time of the last _completed_ scan of the event logs. The value is displayed to the user in his/her local time zone. ## How was this patch tested? All unit tests pass. Particularly all the suites under org.apache.spark.deploy.history.\* were run to test changes. - Very first startup - Pending logs - no logs processed yet: <img width="1280" alt="screen shot 2016-10-24 at 3 07 04 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640981/b8d2a96a-99fc-11e6-9b1f-2d736fe90e48.png"> - Very first startup - Pending logs - some logs processed: <img width="1280" alt="screen shot 2016-10-24 at 3 18 42 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641087/3f8e3bae-99fd-11e6-9ef1-e0e70d71d8ef.png"> - Last updated - No currently pending logs: <img width="1280" alt="screen shot 2016-10-17 at 8 34 37 pm" src="https://cloud.githubusercontent.com/assets/12079825/19443100/4d13946c-94a9-11e6-8ee2-c442729bb206.png"> - Last updated - With some currently pending logs: <img width="1280" alt="screen shot 2016-10-24 at 3 09 31 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640903/7323ba3a-99fc-11e6-8359-6a45753dbb28.png"> - No applications found and No currently pending logs: <img width="1280" alt="screen shot 2016-10-24 at 3 24 26 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641364/03a2cb04-99fe-11e6-87d6-d09587fc6201.png"> Author: Vinayak <[email protected]> Closes #15410 from vijoshi/SAAS-608_master.
1 parent 4f15d94 commit a531fe1

File tree

5 files changed

+116
-18
lines changed

5 files changed

+116
-18
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
$(document).ready(function() {
19+
if ($('#last-updated').length) {
20+
var lastUpdatedMillis = Number($('#last-updated').text());
21+
var updatedDate = new Date(lastUpdatedMillis);
22+
$('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
23+
}
24+
});

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,30 @@ private[history] case class LoadedAppUI(
7474

7575
private[history] abstract class ApplicationHistoryProvider {
7676

77+
/**
78+
* Returns the count of application event logs that the provider is currently still processing.
79+
* History Server UI can use this to indicate to a user that the application listing on the UI
80+
* can be expected to list additional known applications once the processing of these
81+
* application event logs completes.
82+
*
83+
* A History Provider that does not have a notion of count of event logs that may be pending
84+
* for processing need not override this method.
85+
*
86+
* @return Count of application event logs that are currently under process
87+
*/
88+
def getEventLogsUnderProcess(): Int = {
89+
return 0;
90+
}
91+
92+
/**
93+
* Returns the time the history provider last updated the application history information
94+
*
95+
* @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
96+
*/
97+
def getLastUpdatedTime(): Long = {
98+
return 0;
99+
}
100+
77101
/**
78102
* Returns a list of applications available for the history server to show.
79103
*

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
1919

2020
import java.io.{FileNotFoundException, IOException, OutputStream}
2121
import java.util.UUID
22-
import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
22+
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
2323
import java.util.zip.{ZipEntry, ZipOutputStream}
2424

2525
import scala.collection.mutable
@@ -108,7 +108,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
108108

109109
// The modification time of the newest log detected during the last scan. Currently only
110110
// used for logging msgs (logs are re-scanned based on file size, rather than modtime)
111-
private var lastScanTime = -1L
111+
private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
112112

113113
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
114114
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -120,6 +120,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
120120
// List of application logs to be deleted by event log cleaner.
121121
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
122122

123+
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
124+
123125
/**
124126
* Return a runnable that performs the given operation on the event logs.
125127
* This operation is expected to be executed periodically.
@@ -226,6 +228,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
226228
applications.get(appId)
227229
}
228230

231+
override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
232+
233+
override def getLastUpdatedTime(): Long = lastScanTime.get()
234+
229235
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
230236
try {
231237
applications.get(appId).flatMap { appInfo =>
@@ -329,26 +335,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
329335
if (logInfos.nonEmpty) {
330336
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
331337
}
332-
logInfos.map { file =>
333-
replayExecutor.submit(new Runnable {
338+
339+
var tasks = mutable.ListBuffer[Future[_]]()
340+
341+
try {
342+
for (file <- logInfos) {
343+
tasks += replayExecutor.submit(new Runnable {
334344
override def run(): Unit = mergeApplicationListing(file)
335345
})
336346
}
337-
.foreach { task =>
338-
try {
339-
// Wait for all tasks to finish. This makes sure that checkForLogs
340-
// is not scheduled again while some tasks are already running in
341-
// the replayExecutor.
342-
task.get()
343-
} catch {
344-
case e: InterruptedException =>
345-
throw e
346-
case e: Exception =>
347-
logError("Exception while merging application listings", e)
348-
}
347+
} catch {
348+
// let the iteration over logInfos break, since an exception on
349+
// replayExecutor.submit (..) indicates the ExecutorService is unable
350+
// to take any more submissions at this time
351+
352+
case e: Exception =>
353+
logError(s"Exception while submitting event log for replay", e)
354+
}
355+
356+
pendingReplayTasksCount.addAndGet(tasks.size)
357+
358+
tasks.foreach { task =>
359+
try {
360+
// Wait for all tasks to finish. This makes sure that checkForLogs
361+
// is not scheduled again while some tasks are already running in
362+
// the replayExecutor.
363+
task.get()
364+
} catch {
365+
case e: InterruptedException =>
366+
throw e
367+
case e: Exception =>
368+
logError("Exception while merging application listings", e)
369+
} finally {
370+
pendingReplayTasksCount.decrementAndGet()
349371
}
372+
}
350373

351-
lastScanTime = newLastScanTime
374+
lastScanTime.set(newLastScanTime)
352375
} catch {
353376
case e: Exception => logError("Exception in checking for event log updates", e)
354377
}
@@ -365,7 +388,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
365388
} catch {
366389
case e: Exception =>
367390
logError("Exception encountered when attempting to update last scan time", e)
368-
lastScanTime
391+
lastScanTime.get()
369392
} finally {
370393
if (!fs.delete(path, true)) {
371394
logWarning(s"Error deleting ${path}")

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,30 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
3030
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
3131

3232
val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
33+
val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
34+
val lastUpdatedTime = parent.getLastUpdatedTime()
3335
val providerConfig = parent.getProviderConfig()
3436
val content =
37+
<script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
3538
<div>
3639
<div class="span12">
3740
<ul class="unstyled">
3841
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
3942
</ul>
43+
{
44+
if (eventLogsUnderProcessCount > 0) {
45+
<p>There are {eventLogsUnderProcessCount} event log(s) currently being
46+
processed which may result in additional applications getting listed on this page.
47+
Refresh the page to view updates. </p>
48+
}
49+
}
50+
51+
{
52+
if (lastUpdatedTime > 0) {
53+
<p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
54+
}
55+
}
56+
4057
{
4158
if (allAppsSize > 0) {
4259
<script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
@@ -46,6 +63,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
4663
<script>setAppLimit({parent.maxApplications})</script>
4764
} else if (requestedIncomplete) {
4865
<h4>No incomplete applications found!</h4>
66+
} else if (eventLogsUnderProcessCount > 0) {
67+
<h4>No completed applications found!</h4>
4968
} else {
5069
<h4>No completed applications found!</h4> ++ parent.emptyListingHtml
5170
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,14 @@ class HistoryServer(
179179
provider.getListing()
180180
}
181181

182+
def getEventLogsUnderProcess(): Int = {
183+
provider.getEventLogsUnderProcess()
184+
}
185+
186+
def getLastUpdatedTime(): Long = {
187+
provider.getLastUpdatedTime()
188+
}
189+
182190
def getApplicationInfoList: Iterator[ApplicationInfo] = {
183191
getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
184192
}

0 commit comments

Comments
 (0)