Skip to content

Commit 52bbae8

Browse files
committed
StorageListener & StorageStatusListener needs to synchronize internally to be thread-safe
1 parent 31c79ce commit 52bbae8

File tree

3 files changed

+10
-3
lines changed

3 files changed

+10
-3
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ private[v1] class AllRDDResource(uiRoot: UIRoot) {
3030
@PathParam("appId") appId: String
3131
): Seq[RDDStorageInfo] = {
3232
uiRoot.withSparkUI(appId) { ui =>
33-
// should all access on storageListener also be synchronized?
3433
val storageStatusList = ui.storageListener.storageStatusList
3534
val rddInfos = ui.storageListener.rddInfoList
3635
rddInfos.map{rddInfo =>

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@ import org.apache.spark.scheduler._
2525
/**
2626
* :: DeveloperApi ::
2727
* A SparkListener that maintains executor storage status.
28+
*
29+
* Unlike JobProgressListener, this class is thread-safe, so users do not need to synchronize
2830
*/
2931
@DeveloperApi
3032
class StorageStatusListener extends SparkListener {
3133
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
3234
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
3335

34-
def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq
36+
def storageStatusList: Seq[StorageStatus] = synchronized {
37+
executorIdToStorageStatus.values.toSeq
38+
}
3539

3640
/** Update storage status list to reflect updated block statuses */
3741
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {

core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag
3535
/**
3636
* :: DeveloperApi ::
3737
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
38+
*
39+
* Unlike JobProgressListener, this class is thread-safe, so users do not need to synchronize
3840
*/
3941
@DeveloperApi
4042
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
@@ -43,7 +45,9 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
4345
def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
4446

4547
/** Filter RDD info to include only those with cached partitions */
46-
def rddInfoList: Seq[RDDInfo] = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
48+
def rddInfoList: Seq[RDDInfo] = synchronized {
49+
_rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
50+
}
4751

4852
/** Update the storage info of the RDDs whose blocks are among the given updated blocks */
4953
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {

0 commit comments

Comments
 (0)