@@ -19,6 +19,7 @@ package org.apache.spark.streaming.ui
1919
2020import java .text .SimpleDateFormat
2121import java .util .Date
22+ import java .util .concurrent .TimeUnit
2223import javax .servlet .http .HttpServletRequest
2324
2425import scala .collection .mutable .ArrayBuffer
@@ -38,8 +39,8 @@ import org.apache.spark.util.Distribution
3839 * @param maxY the max value of Y axis
3940 * @param unitY the unit of Y axis
4041 */
41- private [ui] case class TimelineUIData (divId : String , data : Seq [(Long , _)], minX : Long , maxX : Long ,
42- minY : Long , maxY : Long , unitY : String ) {
42+ private [ui] class TimelineUIData (divId : String , data : Seq [(Long , _)], minX : Long , maxX : Long ,
43+ minY : Double , maxY : Double , unitY : String ) {
4344
4445 def toHtml (jsCollector : JsCollector ): Seq [Node ] = {
4546 val jsForData = data.map { case (x, y) =>
@@ -60,8 +61,8 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX:
6061 * @param maxY the max value of Y axis
6162 * @param unitY the unit of Y axis
6263 */
63- private [ui] case class DistributionUIData (
64- divId : String , data : Seq [_], minY : Long , maxY : Long , unitY : String ) {
64+ private [ui] class DistributionUIData (
65+ divId : String , data : Seq [_], minY : Double , maxY : Double , unitY : String ) {
6566
6667 def toHtml (jsCollector : JsCollector ): Seq [Node ] = {
6768 val jsForData = data.mkString(" [" , " ," , " ]" )
@@ -72,7 +73,11 @@ private[ui] case class DistributionUIData(
7273 }
7374}
7475
75- private [ui] case class MillisecondsStatUIData (data : Seq [(Long , Long )]) {
76+ private [ui] class MillisecondsStatUIData (data : Seq [(Long , Long )]) {
77+
78+ def timelineData (unit : TimeUnit ) = data.map(x => x._1 -> StreamingPage .convertToTimeUnit(x._2, unit))
79+
80+ def distributionData (unit : TimeUnit ) = data.map(x => StreamingPage .convertToTimeUnit(x._2, unit))
7681
7782 val avg : Option [Long ] = if (data.isEmpty) None else Some (data.map(_._2).sum / data.size)
7883
@@ -81,7 +86,7 @@ private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
8186 val max : Option [Long ] = if (data.isEmpty) None else Some (data.map(_._2).max)
8287}
8388
84- private [ui] case class DoubleStatUIData (data : Seq [(Long , Double )]) {
89+ private [ui] class DoubleStatUIData (val data : Seq [(Long , Double )]) {
8590
8691 val avg : Option [Double ] = if (data.isEmpty) None else Some (data.map(_._2).sum / data.size)
8792
@@ -158,30 +163,31 @@ private[ui] class StreamingPage(parent: StreamingTab)
158163 val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
159164 val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
160165
161- val eventRateForAllReceivers = DoubleStatUIData (batchInfos.map { batchInfo =>
166+ val eventRateForAllReceivers = new DoubleStatUIData (batchInfos.map { batchInfo =>
162167 (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
163168 })
164169
165- val schedulingDelay = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
170+ val schedulingDelay = new MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
166171 batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
167172 })
168- val processingTime = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
173+ val processingTime = new MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
169174 batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
170175 })
171- val totalDelay = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
176+ val totalDelay = new MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
172177 batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
173178 })
174179
175180 val jsCollector = new JsCollector
176181
177182 // Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
178183 // Y axis ranges same.
179- val maxTime =
184+ val _maxTime =
180185 (for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield
181186 m1 max m2 max m3).getOrElse(0L )
182- List (1 , 2 , 3 ).sum
183187 // Should start at 0
184188 val minTime = 0L
189+ val (maxTime, unit) = UIUtils .normalizeDuration(_maxTime)
190+ val formattedUnit = UIUtils .shortTimeUnitString(unit)
185191
186192 // Use the max input rate for all receivers' graphs to make the Y axis ranges same.
187193 // If it's not an integral number, just use its ceil integral number.
@@ -196,7 +202,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
196202 |else $$ (this).html(' $BLACK_RIGHT_TRIANGLE_HTML'); """ .stripMargin.replaceAll(" \\ n" , " " )
197203
198204 val timelineDataForEventRateOfAllReceivers =
199- TimelineUIData (
205+ new TimelineUIData (
200206 " all-receiver-events-timeline" ,
201207 eventRateForAllReceivers.data,
202208 minBatchTime,
@@ -206,81 +212,83 @@ private[ui] class StreamingPage(parent: StreamingTab)
206212 " events/sec" ).toHtml(jsCollector)
207213
208214 val distributionDataForEventRateOfAllReceivers =
209- DistributionUIData (
215+ new DistributionUIData (
210216 " all-receiver-events-distribution" ,
211217 eventRateForAllReceivers.data.map(_._2),
212218 minEventRate,
213219 maxEventRate,
214220 " events/sec" ).toHtml(jsCollector)
215221
216222 val timelineDataForSchedulingDelay =
217- TimelineUIData (
223+ new TimelineUIData (
218224 " scheduling-delay-timeline" ,
219- schedulingDelay.data ,
225+ schedulingDelay.timelineData(unit) ,
220226 minBatchTime,
221227 maxBatchTime,
222228 minTime,
223229 maxTime,
224- " ms " ).toHtml(jsCollector)
230+ formattedUnit ).toHtml(jsCollector)
225231
226232 val distributionDataForSchedulingDelay =
227- DistributionUIData (
233+ new DistributionUIData (
228234 " scheduling-delay-distribution" ,
229- schedulingDelay.data.map(_._2 ),
235+ schedulingDelay.distributionData(unit ),
230236 minTime,
231237 maxTime,
232- " ms " ).toHtml(jsCollector)
238+ formattedUnit ).toHtml(jsCollector)
233239
234240 val timelineDataForProcessingTime =
235- TimelineUIData (
241+ new TimelineUIData (
236242 " processing-time-timeline" ,
237- processingTime.data ,
243+ processingTime.timelineData(unit) ,
238244 minBatchTime,
239245 maxBatchTime,
240246 minTime,
241247 maxTime,
242- " ms " ).toHtml(jsCollector)
248+ formattedUnit ).toHtml(jsCollector)
243249
244250 val distributionDataForProcessingTime =
245- DistributionUIData (
251+ new DistributionUIData (
246252 " processing-time-distribution" ,
247- processingTime.data.map(_._2 ),
253+ processingTime.distributionData(unit ),
248254 minTime,
249255 maxTime,
250- " ms " ).toHtml(jsCollector)
256+ formattedUnit ).toHtml(jsCollector)
251257
252258 val timelineDataForTotalDelay =
253- TimelineUIData (
259+ new TimelineUIData (
254260 " total-delay-timeline" ,
255- totalDelay.data ,
261+ totalDelay.timelineData(unit) ,
256262 minBatchTime,
257263 maxBatchTime,
258264 minTime,
259265 maxTime,
260- " ms " ).toHtml(jsCollector)
266+ formattedUnit ).toHtml(jsCollector)
261267
262268 val distributionDataForTotalDelay =
263- DistributionUIData (
269+ new DistributionUIData (
264270 " total-delay-distribution" ,
265- totalDelay.data.map(_._2 ),
271+ totalDelay.distributionData(unit ),
266272 minTime,
267273 maxTime,
268- " ms " ).toHtml(jsCollector)
274+ formattedUnit ).toHtml(jsCollector)
269275
270276 val table =
271277 // scalastyle:off
272278 <table class =" table table-bordered" style =" width: auto" >
273279 <thead >
274- <tr ><th ></th ><th >Timelines </th ><th >Histograms </th ></tr >
280+ <tr ><th style = " width: 160px; " ></th ><th style = " width: 492px; " >Timelines </th ><th style = " width: 300px; " >Histograms </th ></tr >
275281 </thead >
276282 <tbody >
277283 <tr >
278- <td style =" vertical-align: middle; width: 200px;" >
284+ <td style =" vertical-align: middle; width: 160px;" >
285+ <div style =" width: 160px;" >
279286 <div >
280287 <span onclick ={Unparsed (triangleJs)}>{Unparsed (BLACK_RIGHT_TRIANGLE_HTML )}</span >
281288 <strong >Input Rate </strong >
282289 </div >
283290 <div >Avg : {eventRateForAllReceivers.formattedAvg} events/ sec</div >
291+ </div >
284292 </td >
285293 <td class =" timeline" >{timelineDataForEventRateOfAllReceivers}</td >
286294 <td class =" distribution" >{distributionDataForEventRateOfAllReceivers}</td >
@@ -325,19 +333,19 @@ private[ui] class StreamingPage(parent: StreamingTab)
325333 jsCollector : JsCollector ,
326334 minX : Long ,
327335 maxX : Long ,
328- minY : Long ,
329- maxY : Long ): Seq [Node ] = {
336+ minY : Double ,
337+ maxY : Double ): Seq [Node ] = {
330338 val content = listener.receivedRecordsDistributions.map { case (receiverId, distribution) =>
331339 generateInputReceiverRow(jsCollector, receiverId, distribution, minX, maxX, minY, maxY)
332340 }.foldLeft[Seq [Node ]](Nil )(_ ++ _)
333341
334342 <table class =" table table-bordered" style =" width: auto" >
335343 <thead >
336344 <tr >
337- <th ></th >
338- <th style =" width: 166.7px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Status </div ></th >
339- <th style =" width: 166.7px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Location </div ></th >
340- <th style =" width: 166.7px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Last Error Time </div ></th >
345+ <th style = " width: 151px; " ></th >
346+ <th style =" width: 167px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Status </div ></th >
347+ <th style =" width: 167px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Location </div ></th >
348+ <th style =" width: 166px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Last Error Time </div ></th >
341349 <th >Last Error Message </th >
342350 </tr >
343351 </thead >
@@ -353,8 +361,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
353361 distribution : Option [Distribution ],
354362 minX : Long ,
355363 maxX : Long ,
356- minY : Long ,
357- maxY : Long ): Seq [Node ] = {
364+ minY : Double ,
365+ maxY : Double ): Seq [Node ] = {
358366 val avgReceiverEvents = distribution.map(_.statCounter.mean.toLong)
359367 val receiverInfo = listener.receiverInfo(receiverId)
360368 val receiverName = receiverInfo.map(_.name).getOrElse(s " Receiver- $receiverId" )
@@ -371,7 +379,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
371379 val receivedRecords = listener.receivedRecordsWithBatchTime.get(receiverId).getOrElse(Seq ())
372380
373381 val timelineForEventRate =
374- TimelineUIData (
382+ new TimelineUIData (
375383 s " receiver- $receiverId-events-timeline " ,
376384 receivedRecords,
377385 minX,
@@ -381,19 +389,21 @@ private[ui] class StreamingPage(parent: StreamingTab)
381389 " events/sec" ).toHtml(jsCollector)
382390
383391 val distributionForEventsRate =
384- DistributionUIData (
392+ new DistributionUIData (
385393 s " receiver- $receiverId-events-distribution " ,
386394 receivedRecords.map(_._2),
387395 minY,
388396 maxY,
389397 " events/sec" ).toHtml(jsCollector)
390398
391399 <tr >
392- <td rowspan =" 2" style =" vertical-align: middle; width: 193px;" >
400+ <td rowspan =" 2" style =" vertical-align: middle; width: 151px;" >
401+ <div style =" width: 151px;" >
393402 <div >
394403 <strong >{receiverName}</strong >
395404 </div >
396405 <div >Avg : {avgReceiverEvents.map(_.toString).getOrElse(emptyCell)} events/ sec</div >
406+ </div >
397407 </td >
398408 <td >{receiverActive}</td >
399409 <td >{receiverLocation}</td >
@@ -449,6 +459,16 @@ private[ui] object StreamingPage {
449459 def formatDurationOption (msOption : Option [Long ]): String = {
450460 msOption.map(formatDurationVerbose).getOrElse(emptyCell)
451461 }
462+
463+ def convertToTimeUnit (milliseconds : Long , unit : TimeUnit ): Double = unit match {
464+ case TimeUnit .NANOSECONDS => milliseconds * 1000 * 1000 // not used yet
465+ case TimeUnit .MICROSECONDS => milliseconds * 1000 // not used yet
466+ case TimeUnit .MILLISECONDS => milliseconds
467+ case TimeUnit .SECONDS => milliseconds / 1000.0
468+ case TimeUnit .MINUTES => milliseconds / 1000.0 / 60.0
469+ case TimeUnit .HOURS => milliseconds / 1000.0 / 60.0 / 60.0
470+ case TimeUnit .DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
471+ }
452472}
453473
454474/**
0 commit comments