From 2459342194e41c2492cc3d69056a6c9a947475ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=94=B0=E7=94=B000222924?= Date: Fri, 25 Jan 2019 16:58:53 +0800 Subject: [PATCH 1/7] [SPARK-26726] The amount of memory used by the broadcast variable is not synchronized to the UI display --- .../spark/status/AppStatusListener.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 1067cdc84ceb..eb70ff3a6301 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -827,6 +827,7 @@ private[spark] class AppStatusListener( event.blockUpdatedInfo.blockId match { case block: RDDBlockId => updateRDDBlock(event, block) case stream: StreamBlockId => updateStreamBlock(event, stream) + case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } } @@ -995,6 +996,30 @@ private[spark] class AppStatusListener( } } + def updateBroadcastBlock(event: SparkListenerBlockUpdated, broadcast: BroadcastBlockId): Unit = { + val executorId = event.blockUpdatedInfo.blockManagerId.executorId + val storageLevel = event.blockUpdatedInfo.storageLevel + + // Whether values are being added to or removed from the existing accounting. + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + + // Function to apply a delta to a value, but ensure that it doesn't go negative. + def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) + + liveExecutors.get(executorId).foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta) + } else { + exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta) + } + } + exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) + exec.diskUsed = newValue(exec.diskUsed, diskDelta) + } + } + private def getOrCreateStage(info: StageInfo): LiveStage = { val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber), new Function[(Int, Int), LiveStage]() { From a51ea3e10ec2487322e9fc7ba18e5cbe8ebc30c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=94=B0=E7=94=B000222924?= Date: Mon, 28 Jan 2019 15:51:34 +0800 Subject: [PATCH 2/7] add the test case --- .../spark/status/AppStatusListener.scala | 8 ++++++-- .../spark/status/AppStatusListenerSuite.scala | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index eb70ff3a6301..7ae29726ad11 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -996,7 +996,9 @@ private[spark] class AppStatusListener( } } - def updateBroadcastBlock(event: SparkListenerBlockUpdated, broadcast: BroadcastBlockId): Unit = { + private def updateBroadcastBlock(event: SparkListenerBlockUpdated, + broadcast: BroadcastBlockId): Unit = { + val now = System.nanoTime() val executorId = event.blockUpdatedInfo.blockManagerId.executorId val storageLevel = event.blockUpdatedInfo.storageLevel @@ -1007,7 +1009,8 @@ private[spark] class AppStatusListener( // Function to apply a delta to a value, but ensure that it doesn't go negative. def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) - liveExecutors.get(executorId).foreach { exec => + val maybeExec = liveExecutors.get(executorId) + maybeExec.foreach { exec => if (exec.hasMemoryInfo) { if (storageLevel.useOffHeap) { exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta) @@ -1017,6 +1020,7 @@ private[spark] class AppStatusListener( } exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) exec.diskUsed = newValue(exec.diskUsed, diskDelta) + maybeUpdate(exec, now) } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index ede9466b3d63..e367b3034a52 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -938,6 +938,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { intercept[NoSuchElementException] { check[StreamBlockData](stream1.name) { _ => () } } + + // Update a BroadcastBlock. + val broadcast1 = BroadcastBlockId(1L) + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, broadcast1, level, 1L, 1L))) + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + // Drop a BroadcastBlock. + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, broadcast1, StorageLevel.NONE, 1L, 1L))) + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.memoryUsed === 0) + assert(exec.info.diskUsed === 0) + } } test("eviction of old data") { From e9d0f18d2ebeba5879b79f38c4ee7f942624aba4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=94=B0=E7=94=B000222924?= Date: Tue, 29 Jan 2019 14:25:51 +0800 Subject: [PATCH 3/7] Improve code modification --- .../spark/status/AppStatusListener.scala | 57 ++++++++----------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7ae29726ad11..4428fd91a64e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -759,18 +759,10 @@ private[spark] class AppStatusListener( // Use RDD distribution to update executor memory and disk usage info. liveRDD.getDistributions().foreach { case (executorId, rddDist) => - liveExecutors.get(executorId).foreach { exec => - if (exec.hasMemoryInfo) { - if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed) - } else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed) - } - } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed) - exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed) - maybeUpdate(exec, now) - } + val maybeExec = liveExecutors.get(executorId) + updateExecutorMemoryDiskUsed(maybeExec, storageLevel, -rddDist.memoryUsed, + -rddDist.diskUsed, Option(-rddDist.offHeapUsed), Option(-rddDist.onHeapUsed)) + maybeExec.foreach(exec => maybeUpdate(exec, now)) } } @@ -885,17 +877,7 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. - maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { - if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) - } else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) - } - } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) - } + updateExecutorMemoryDiskUsed(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) // Update the block entry in the RDD info, keeping track of the deltas above so that we // can update the executor information too. @@ -996,8 +978,9 @@ private[spark] class AppStatusListener( } } - private def updateBroadcastBlock(event: SparkListenerBlockUpdated, - broadcast: BroadcastBlockId): Unit = { + private def updateBroadcastBlock( + event: SparkListenerBlockUpdated, + broadcast: BroadcastBlockId): Unit = { val now = System.nanoTime() val executorId = event.blockUpdatedInfo.blockManagerId.executorId val storageLevel = event.blockUpdatedInfo.storageLevel @@ -1006,21 +989,29 @@ private[spark] class AppStatusListener( val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) - // Function to apply a delta to a value, but ensure that it doesn't go negative. - def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) - val maybeExec = liveExecutors.get(executorId) + updateExecutorMemoryDiskUsed(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) + maybeExec.foreach(exec => maybeUpdate(exec, now)) + + } + // update executor memory and disk usage info + private def updateExecutorMemoryDiskUsed( + maybeExec: Option[LiveExecutor], + storageLevel: StorageLevel, + memoryDelta: Long, + diskDelta: Long, + OffHeapDelta: Option[Long], + OnHeapDelta: Option[Long]): Unit = { maybeExec.foreach { exec => if (exec.hasMemoryInfo) { if (storageLevel.useOffHeap) { - exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta) + exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, OffHeapDelta.getOrElse(memoryDelta)) } else { - exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta) + exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, OnHeapDelta.getOrElse(memoryDelta)) } } - exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = newValue(exec.diskUsed, diskDelta) - maybeUpdate(exec, now) + exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) + exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) } } From 53cbc0be04cd5a2a8410dccf00ac0ca3cc8b370c Mon Sep 17 00:00:00 2001 From: "han.tiantian@zte.com.cn" Date: Tue, 29 Jan 2019 14:35:51 +0800 Subject: [PATCH 4/7] Update the code --- .../org/apache/spark/status/AppStatusListener.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 4428fd91a64e..f1918fbca0d1 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -760,7 +760,7 @@ private[spark] class AppStatusListener( // Use RDD distribution to update executor memory and disk usage info. liveRDD.getDistributions().foreach { case (executorId, rddDist) => val maybeExec = liveExecutors.get(executorId) - updateExecutorMemoryDiskUsed(maybeExec, storageLevel, -rddDist.memoryUsed, + updateExecutorMemoryDiskInfo(maybeExec, storageLevel, -rddDist.memoryUsed, -rddDist.diskUsed, Option(-rddDist.offHeapUsed), Option(-rddDist.onHeapUsed)) maybeExec.foreach(exec => maybeUpdate(exec, now)) } @@ -877,7 +877,7 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. - updateExecutorMemoryDiskUsed(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) + updateExecutorMemoryDiskInfo(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) // Update the block entry in the RDD info, keeping track of the deltas above so that we // can update the executor information too. @@ -990,12 +990,13 @@ private[spark] class AppStatusListener( val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) val maybeExec = liveExecutors.get(executorId) - updateExecutorMemoryDiskUsed(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) + updateExecutorMemoryDiskInfo(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) maybeExec.foreach(exec => maybeUpdate(exec, now)) } + // update executor memory and disk usage info - private def updateExecutorMemoryDiskUsed( + private def updateExecutorMemoryDiskInfo( maybeExec: Option[LiveExecutor], storageLevel: StorageLevel, memoryDelta: Long, From b4d6996179e9b96cf1a92f2058d376bed46fe3b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=94=B0=E7=94=B000222924?= Date: Wed, 30 Jan 2019 14:21:59 +0800 Subject: [PATCH 5/7] Update the code --- .../spark/status/AppStatusListener.scala | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f1918fbca0d1..21110421dd2a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -759,11 +759,20 @@ private[spark] class AppStatusListener( // Use RDD distribution to update executor memory and disk usage info. liveRDD.getDistributions().foreach { case (executorId, rddDist) => - val maybeExec = liveExecutors.get(executorId) - updateExecutorMemoryDiskInfo(maybeExec, storageLevel, -rddDist.memoryUsed, - -rddDist.diskUsed, Option(-rddDist.offHeapUsed), Option(-rddDist.onHeapUsed)) - maybeExec.foreach(exec => maybeUpdate(exec, now)) + liveExecutors.get(executorId).foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed) + } else { + exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed) + } + } + exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed) + exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed) + maybeUpdate(exec, now) + } } + } kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) @@ -877,7 +886,9 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. - updateExecutorMemoryDiskInfo(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) + maybeExec.foreach { exec => + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) + } // Update the block entry in the RDD info, keeping track of the deltas above so that we // can update the executor information too. @@ -989,31 +1000,26 @@ private[spark] class AppStatusListener( val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) - val maybeExec = liveExecutors.get(executorId) - updateExecutorMemoryDiskInfo(maybeExec, storageLevel, memoryDelta, diskDelta, None, None) - maybeExec.foreach(exec => maybeUpdate(exec, now)) - + liveExecutors.get(executorId).foreach { exec => + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) + maybeUpdate(exec, now) + } } - - // update executor memory and disk usage info + private def updateExecutorMemoryDiskInfo( - maybeExec: Option[LiveExecutor], + exec: LiveExecutor, storageLevel: StorageLevel, memoryDelta: Long, - diskDelta: Long, - OffHeapDelta: Option[Long], - OnHeapDelta: Option[Long]): Unit = { - maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { - if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, OffHeapDelta.getOrElse(memoryDelta)) - } else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, OnHeapDelta.getOrElse(memoryDelta)) - } + diskDelta: Long): Unit = { + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) + } else { + exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) } + exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) + exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) } private def getOrCreateStage(info: StageInfo): LiveStage = { From 221bf7741d4bc348cf9ec2211dcb4bd64af8b75b Mon Sep 17 00:00:00 2001 From: "han.tiantian@zte.com.cn" Date: Wed, 30 Jan 2019 14:25:20 +0800 Subject: [PATCH 6/7] Update AppStatusListener.scala --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 21110421dd2a..7076c8c40432 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -772,7 +772,6 @@ private[spark] class AppStatusListener( maybeUpdate(exec, now) } } - } kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) From 2024976b473665ff5ebde3f1ae68ce180fd54ae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E7=94=B0=E7=94=B000222924?= Date: Thu, 31 Jan 2019 09:52:52 +0800 Subject: [PATCH 7/7] Update AppStatusListener.scala --- .../org/apache/spark/status/AppStatusListener.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7076c8c40432..4da2ae289a2d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -991,15 +991,15 @@ private[spark] class AppStatusListener( private def updateBroadcastBlock( event: SparkListenerBlockUpdated, broadcast: BroadcastBlockId): Unit = { - val now = System.nanoTime() val executorId = event.blockUpdatedInfo.blockManagerId.executorId - val storageLevel = event.blockUpdatedInfo.storageLevel + liveExecutors.get(executorId).foreach { exec => + val now = System.nanoTime() + val storageLevel = event.blockUpdatedInfo.storageLevel - // Whether values are being added to or removed from the existing accounting. - val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) - val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + // Whether values are being added to or removed from the existing accounting. + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) - liveExecutors.get(executorId).foreach { exec => updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) maybeUpdate(exec, now) }