-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20776] Fix perf. problems in JobProgressListener caused by TaskMetrics construction #18008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
|
|
||
| import InternalAccumulator._ | ||
| @transient private[spark] lazy val nameToAccums = LinkedHashMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the use of LinkedHashMap was added by @cloud-fan in #12612 in order to preserve ordering from the old code. As far as I can tell we don't actually rely on the ordering of the entries in this map, so I didn't preserved the use of LinkedHashMap.
| testAccum.foreach { accum => | ||
| map.put(TEST_ACCUM, accum) | ||
| } | ||
| map.asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map + wrapper might consume a little bit of extra memory compared to the old code but it doesn't matter because we don't have that many TaskMetrics resident in the JVM at the same time: in the executor, the only instances are in TaskContexts and in the driver you only have one per stage in the scheduler and some temporary ones in the listener bus queue which are freed as soon as the queue events are processed (which happens faster now, outweighing the extra space usage).
|
Actually, stepping back a second, we might be able to completely remove this bottleneck by simply not constructing tons of empty TaskMetrics objects in JobProgressListener's hot path. Let me see if I can update to do that instead. |
| updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) | ||
| } | ||
|
|
||
| val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important note here: in the old code, the elseUpdate branch would only be taken in rare error cases where we somehow purged the TaskUIData which should have been created when the task launched. It technically doesn't matter what we put in for the Option[Metrics] here since it just gets unconditionally overwritten on line 410 in the old code. So while my new code constructs TaskUIData with default metrics it doesn't actually change the behavior of this block.
| private var _metrics: Option[TaskMetricsUIData]) { | ||
| class TaskUIData private(private var _taskInfo: TaskInfo) { | ||
|
|
||
| private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will this be None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only way for this to become None is if updateTaskMetrics is called with None.
updateTaskMetrics is called in two places:
- In JobProgressListener.onTaskEnd, where the metrics are from
Option(taskEnd.taskMetrics), wheretaskEnd.taskMetricscan benullin case the task has failed (according to docs). - In JobProgressListener.onExecutorMetricsUpdate, where the metrics are guaranteed to be defined / non-None.
|
LGTM |
|
Test build #76984 has finished for PR 18008 at commit
|
|
Test build #76988 has finished for PR 18008 at commit
|
|
Test build #76990 has finished for PR 18008 at commit
|
|
Test build #76991 has finished for PR 18008 at commit
|
|
thanks, merging to master/2.2! |
…kMetrics construction ## What changes were proposed in this pull request? In ``` ./bin/spark-shell --master=local[64] ``` I ran ``` sc.parallelize(1 to 100000, 100000).count() ``` and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in `TaskMetrics.empty` calls in `JobProgressListener.onTaskStart`. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code. The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput. **Before:**  **After:**  ## How was this patch tested? Benchmarks described above. Author: Josh Rosen <[email protected]> Closes #18008 from JoshRosen/nametoaccums-improvements. (cherry picked from commit 30e0557) Signed-off-by: Wenchen Fan <[email protected]>
|
@JoshRosen , what's the tool in your screenshot? |
|
@witgo, I'm using YourKit Java Profiler 2016.02. In these screenshots I enabled CPU sampling then took a performance snapshot and used the per-thread view, focusing on the time taken in the live listener bus thread by right-clicking on the subtree and choosing "focus subtree" from the context menu. |
|
@JoshRosen I see, Thank you. |
…kMetrics construction ## What changes were proposed in this pull request? In ``` ./bin/spark-shell --master=local[64] ``` I ran ``` sc.parallelize(1 to 100000, 100000).count() ``` and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in `TaskMetrics.empty` calls in `JobProgressListener.onTaskStart`. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code. The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput. **Before:**  **After:**  ## How was this patch tested? Benchmarks described above. Author: Josh Rosen <[email protected]> Closes apache#18008 from JoshRosen/nametoaccums-improvements.
…kMetrics construction ## What changes were proposed in this pull request? In ``` ./bin/spark-shell --master=local[64] ``` I ran ``` sc.parallelize(1 to 100000, 100000).count() ``` and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in `TaskMetrics.empty` calls in `JobProgressListener.onTaskStart`. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code. The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput. **Before:**  **After:**  ## How was this patch tested? Benchmarks described above. Author: Josh Rosen <[email protected]> Closes apache#18008 from JoshRosen/nametoaccums-improvements.
What changes were proposed in this pull request?
In
I ran
and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in
TaskMetrics.emptycalls inJobProgressListener.onTaskStart. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code.The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput.
Before:
After:
How was this patch tested?
Benchmarks described above.