Skip to content

Commit

Permalink
[SPARK-17019][CORE] Expose on-heap and off-heap memory usage in vario…
Browse files Browse the repository at this point in the history
…us places

## What changes were proposed in this pull request?

With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992), Spark supports persisting data into off-heap memory, but the usage of on-heap and off-heap memory is not exposed currently, it is not so convenient for user to monitor and profile, so here propose to expose off-heap memory as well as on-heap memory usage in various places:
1. Spark UI's executor page will display both on-heap and off-heap memory usage.
2. REST request returns both on-heap and off-heap memory.
3. Also this can be gotten from MetricsSystem.
4. Last this usage can be obtained programmatically from SparkListener.

Attach the UI changes:

![screen shot 2016-08-12 at 11 20 44 am](https://cloud.githubusercontent.com/assets/850797/17612032/6c2f4480-607f-11e6-82e8-a27fb8cbb4ae.png)

Backward compatibility is also considered for event-log and REST API. Old event log can still be replayed with off-heap usage displayed as 0. For REST API, only adds the new fields, so JSON backward compatibility can still be kept.
## How was this patch tested?

Unit test added and manual verification.

Author: jerryshao <[email protected]>

Closes #14617 from jerryshao/SPARK-17019.
  • Loading branch information
jerryshao authored and squito committed Apr 6, 2017
1 parent 5a693b4 commit a449162
Show file tree
Hide file tree
Showing 23 changed files with 638 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ <h4 style="clear: left; display: inline-block;">Summary</h4>
<th></th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
</th>
<th class="on_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
</th>
<th class="off_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
</th>
<th>Disk Used</th>
<th>Cores</th>
Expand Down Expand Up @@ -73,6 +81,14 @@ <h4 style="clear: left; display: inline-block;">Executors</h4>
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
Storage Memory</span></th>
<th class="on_heap_memory">
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
On Heap Storage Memory</span></th>
<th class="off_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
Expand Down
103 changes: 101 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/executorspage.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ $(document).ready(function () {
var allRDDBlocks = 0;
var allMemoryUsed = 0;
var allMaxMemory = 0;
var allOnHeapMemoryUsed = 0;
var allOnHeapMaxMemory = 0;
var allOffHeapMemoryUsed = 0;
var allOffHeapMaxMemory = 0;
var allDiskUsed = 0;
var allTotalCores = 0;
var allMaxTasks = 0;
Expand All @@ -208,6 +212,10 @@ $(document).ready(function () {
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
var activeMaxMemory = 0;
var activeOnHeapMemoryUsed = 0;
var activeOnHeapMaxMemory = 0;
var activeOffHeapMemoryUsed = 0;
var activeOffHeapMaxMemory = 0;
var activeDiskUsed = 0;
var activeTotalCores = 0;
var activeMaxTasks = 0;
Expand All @@ -226,6 +234,10 @@ $(document).ready(function () {
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
var deadMaxMemory = 0;
var deadOnHeapMemoryUsed = 0;
var deadOnHeapMaxMemory = 0;
var deadOffHeapMemoryUsed = 0;
var deadOffHeapMaxMemory = 0;
var deadDiskUsed = 0;
var deadTotalCores = 0;
var deadMaxTasks = 0;
Expand All @@ -240,11 +252,22 @@ $(document).ready(function () {
var deadTotalShuffleWrite = 0;
var deadTotalBlacklisted = 0;

response.forEach(function (exec) {
exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
});

response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
allMemoryUsed += exec.memoryUsed;
allMaxMemory += exec.maxMemory;
allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
allOnHeapMaxMemory += exec.maxOnHeapMemory;
allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
allOffHeapMaxMemory += exec.maxOffHeapMemory;
allDiskUsed += exec.diskUsed;
allTotalCores += exec.totalCores;
allMaxTasks += exec.maxTasks;
Expand All @@ -263,6 +286,10 @@ $(document).ready(function () {
activeRDDBlocks += exec.rddBlocks;
activeMemoryUsed += exec.memoryUsed;
activeMaxMemory += exec.maxMemory;
activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
activeOnHeapMaxMemory += exec.maxOnHeapMemory;
activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
activeOffHeapMaxMemory += exec.maxOffHeapMemory;
activeDiskUsed += exec.diskUsed;
activeTotalCores += exec.totalCores;
activeMaxTasks += exec.maxTasks;
Expand All @@ -281,6 +308,10 @@ $(document).ready(function () {
deadRDDBlocks += exec.rddBlocks;
deadMemoryUsed += exec.memoryUsed;
deadMaxMemory += exec.maxMemory;
deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
deadOnHeapMaxMemory += exec.maxOnHeapMemory;
deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
deadOffHeapMaxMemory += exec.maxOffHeapMemory;
deadDiskUsed += exec.diskUsed;
deadTotalCores += exec.totalCores;
deadMaxTasks += exec.maxTasks;
Expand All @@ -302,6 +333,10 @@ $(document).ready(function () {
"allRDDBlocks": allRDDBlocks,
"allMemoryUsed": allMemoryUsed,
"allMaxMemory": allMaxMemory,
"allOnHeapMemoryUsed": allOnHeapMemoryUsed,
"allOnHeapMaxMemory": allOnHeapMaxMemory,
"allOffHeapMemoryUsed": allOffHeapMemoryUsed,
"allOffHeapMaxMemory": allOffHeapMaxMemory,
"allDiskUsed": allDiskUsed,
"allTotalCores": allTotalCores,
"allMaxTasks": allMaxTasks,
Expand All @@ -321,6 +356,10 @@ $(document).ready(function () {
"allRDDBlocks": activeRDDBlocks,
"allMemoryUsed": activeMemoryUsed,
"allMaxMemory": activeMaxMemory,
"allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
"allOnHeapMaxMemory": activeOnHeapMaxMemory,
"allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
"allOffHeapMaxMemory": activeOffHeapMaxMemory,
"allDiskUsed": activeDiskUsed,
"allTotalCores": activeTotalCores,
"allMaxTasks": activeMaxTasks,
Expand All @@ -340,6 +379,10 @@ $(document).ready(function () {
"allRDDBlocks": deadRDDBlocks,
"allMemoryUsed": deadMemoryUsed,
"allMaxMemory": deadMaxMemory,
"allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
"allOnHeapMaxMemory": deadOnHeapMaxMemory,
"allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
"allOffHeapMaxMemory": deadOffHeapMaxMemory,
"allDiskUsed": deadDiskUsed,
"allTotalCores": deadTotalCores,
"allMaxTasks": deadMaxTasks,
Expand Down Expand Up @@ -378,7 +421,35 @@ $(document).ready(function () {
{data: 'rddBlocks'},
{
data: function (row, type) {
return type === 'display' ? (formatBytes(row.memoryUsed, type) + ' / ' + formatBytes(row.maxMemory, type)) : row.memoryUsed;
if (type !== 'display')
return row.memoryUsed;
else
return (formatBytes(row.memoryUsed, type) + ' / ' +
formatBytes(row.maxMemory, type));
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.onHeapMemoryUsed;
else
return (formatBytes(row.onHeapMemoryUsed, type) + ' / ' +
formatBytes(row.maxOnHeapMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('on_heap_memory')
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.offHeapMemoryUsed;
else
return (formatBytes(row.offHeapMemoryUsed, type) + ' / ' +
formatBytes(row.maxOffHeapMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('off_heap_memory')
}
},
{data: 'diskUsed', render: formatBytes},
Expand Down Expand Up @@ -450,7 +521,35 @@ $(document).ready(function () {
{data: 'allRDDBlocks'},
{
data: function (row, type) {
return type === 'display' ? (formatBytes(row.allMemoryUsed, type) + ' / ' + formatBytes(row.allMaxMemory, type)) : row.allMemoryUsed;
if (type !== 'display')
return row.allMemoryUsed
else
return (formatBytes(row.allMemoryUsed, type) + ' / ' +
formatBytes(row.allMaxMemory, type));
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.allOnHeapMemoryUsed;
else
return (formatBytes(row.allOnHeapMemoryUsed, type) + ' / ' +
formatBytes(row.allOnHeapMaxMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('on_heap_memory')
}
},
{
data: function (row, type) {
if (type !== 'display')
return row.allOffHeapMemoryUsed;
else
return (formatBytes(row.allOffHeapMemoryUsed, type) + ' / ' +
formatBytes(row.allOffHeapMaxMemory, type));
},
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
$(nTd).addClass('off_heap_memory')
}
},
{data: 'allDiskUsed', render: formatBytes},
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
.serialization_time, .getting_result_time, .peak_execution_memory {
.serialization_time, .getting_result_time, .peak_execution_memory,
.on_heap_memory, .off_heap_memory {
display: none;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent
case class SparkListenerBlockManagerAdded(
time: Long,
blockManagerId: BlockManagerId,
maxMem: Long,
maxOnHeapMem: Option[Long] = None,
maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
}

@DeveloperApi
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ private[spark] object AllRDDResource {
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
diskUsed = status.diskUsedByRdd(rddId)
diskUsed = status.diskUsedByRdd(rddId),
onHeapMemoryUsed = Some(
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
offHeapMemoryUsed = Some(
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
onHeapMemoryRemaining = status.onHeapMemRemaining,
offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ class ExecutorSummary private[spark](
val totalShuffleWrite: Long,
val isBlacklisted: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String])
val executorLogs: Map[String, String],
val onHeapMemoryUsed: Option[Long],
val offHeapMemoryUsed: Option[Long],
val maxOnHeapMemory: Option[Long],
val maxOffHeapMemory: Option[Long])

class JobData private[spark](
val jobId: Int,
Expand Down Expand Up @@ -111,7 +115,11 @@ class RDDDataDistribution private[spark](
val address: String,
val memoryUsed: Long,
val memoryRemaining: Long,
val diskUsed: Long)
val diskUsed: Long,
val onHeapMemoryUsed: Option[Long],
val offHeapMemoryUsed: Option[Long],
val onHeapMemoryRemaining: Option[Long],
val offHeapMemoryRemaining: Option[Long])

class RDDPartitionInfo private[spark](
val blockName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory =
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
Expand Down Expand Up @@ -229,7 +229,8 @@ private[spark] class BlockManager(

val idFromMaster = master.registerBlockManager(
id,
maxMemory,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)

blockManagerId = if (idFromMaster != null) idFromMaster else id
Expand Down Expand Up @@ -307,7 +308,7 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
reportAllBlocks()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class BlockManagerMaster(
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint(
logInfo("BlockManagerMasterEndpoint up")

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
Expand Down Expand Up @@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(

private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}

Expand Down Expand Up @@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint(
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
maxMemSize: Long,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
Expand All @@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint(
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxMemSize), id))
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))

blockManagerIdByExecutor(id.executorId) = id

blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}

Expand Down Expand Up @@ -464,10 +467,13 @@ object BlockStatus {
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val maxOnHeapMem: Long,
val maxOffHeapMem: Long,
val slaveEndpoint: RpcEndpointRef)
extends Logging {

val maxMem = maxOnHeapMem + maxOffHeapMem

private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem

Expand Down
Loading

0 comments on commit a449162

Please sign in to comment.