@@ -19,16 +19,77 @@ package org.apache.spark.ui.storage
1919
2020import javax .servlet .http .HttpServletRequest
2121
22+ import org .json4s .JsonAST .JNothing
23+
2224import scala .xml .Node
2325
24- import org .apache .spark .storage .{BlockId , BlockStatus , StorageStatus , StorageUtils }
26+ import org .json4s .JValue
27+ import org .json4s .JsonDSL ._
28+
29+ import org .apache .spark .storage ._
2530import org .apache .spark .ui .{WebUIPage , UIUtils }
2631import org .apache .spark .util .Utils
2732
2833/** Page showing storage details for a given RDD */
2934private [ui] class RDDPage (parent : StorageTab ) extends WebUIPage (" rdd" ) {
3035 private val listener = parent.listener
3136
37+ override def renderJson (request : HttpServletRequest ): JValue = {
38+ val rddId = request.getParameter(" id" ).toInt
39+ val storageStatusList = listener.storageStatusList
40+ val rddInfoOpt = listener.rddInfoList.find(_.id == rddId)
41+
42+ var retVal : JValue = JNothing
43+
44+ if (rddInfoOpt.isDefined) {
45+ val rddInfo = rddInfoOpt.get
46+
47+ val rddSummaryJson = (" RDD Summary" ->
48+ (" RDD ID" -> rddId) ~
49+ (" Storage Level" -> rddInfo.storageLevel.description) ~
50+ (" Cached Partitions" -> rddInfo.numCachedPartitions) ~
51+ (" Total Partitions" -> rddInfo.numPartitions) ~
52+ (" Memory Size" -> rddInfo.memSize) ~
53+ (" Disk Size" -> rddInfo.diskSize))
54+
55+ val dataDistributionList = storageStatusList.map {
56+ case status : StorageStatus =>
57+ (" Host" -> (status.blockManagerId.host + " :" + status.blockManagerId.port)) ~
58+ (" Memory Usage" -> status.memUsedByRdd(rddId)) ~
59+ (" Memory Remaining" -> status.memRemaining) ~
60+ (" Disk Usage" -> status.diskUsedByRdd(rddId))
61+ }
62+
63+ val dataDistributionJson = (" Data Distribution" -> dataDistributionList)
64+
65+ val blockLocations = StorageUtils .getRddBlockLocations(rddId, storageStatusList)
66+ val blocks = storageStatusList
67+ .flatMap(_.rddBlocksById(rddId))
68+ .sortWith(_._1.name < _._1.name)
69+ .map { case (blockId, status) =>
70+ (blockId, status, blockLocations.get(blockId).getOrElse(Seq [String ](" Unknown" )))
71+ }
72+ val partitionList = blocks.map {
73+ case (id : BlockId , block : BlockStatus , locations : Seq [String ]) =>
74+ (" Block Name" -> id.toString) ~
75+ (" Storage Level" -> block.storageLevel.description) ~
76+ (" Size in Memory" -> block.memSize) ~
77+ (" Size on Disk" -> block.diskSize) ~
78+ (" Executors" -> locations)
79+ }
80+ val partitionsJson = (" Partitions" -> partitionList)
81+
82+
83+ retVal =
84+ (" RDD Info" ->
85+ rddSummaryJson ~
86+ dataDistributionJson ~
87+ partitionsJson
88+ )
89+ }
90+ retVal
91+ }
92+
3293 def render (request : HttpServletRequest ): Seq [Node ] = {
3394 val rddId = request.getParameter(" id" ).toInt
3495 val storageStatusList = listener.storageStatusList
0 commit comments