Skip to content

Conversation

@andrewor14
Copy link
Contributor

This patch exposes the memory used by internal data structures on the SparkUI. This tracks memory used by all spilling operations and SQL operators backed by Tungsten, e.g. BroadcastHashJoin, ExternalSort, GeneratedAggregate etc. The metric exposed is "peak execution memory", which broadly refers to the peak in-memory sizes of each of these data structure.

A separate patch will extend this by linking the new information to the SQL operators themselves.

screen shot 2015-07-29 at 7 43 17 pm

screen shot 2015-07-29 at 7 43 05 pm

Review on Reviewable

Andrew Or added 12 commits July 28, 2015 16:36
Currently there is only one accumulator: peak execution memory,
which refers to the sizes of all data structures created in
shuffles, aggregations and joins.
These are now tracked through the execution memory accumulator
for each task.
This is for two reasons:

(1) Accumulators must be created on the driver such that all
executors can use the same accumulator IDs to access the correct
accumulators.

(2) Accumulators should be created on the stage level to allow
us to compare the accumulator values across all tasks within the
stage. This representation is more useful when we expose it to
the UI properly later.
…etrics

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/Task.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
This was removed in a merge conflict.
…etrics

Conflicts:
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
	sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
This commit makes the UI display internal accumulators differently.
A future commit will add this to the summary metrics table and
add an informative tooltip to explain what the execution memory
means.
@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #38958 has finished for PR 7770 at commit 5b5e6f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor Author

The information exposed in this patch will be tied to accumulators on the SQL tab introduced in #7774

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: in the Tungsten code I prefer to use /** style comments even for private variables since it makes things marginally nicer in IDEs. If we want to be really explicit, we might want to name this peakMemoryUsageBytes.

Andrew Or added 6 commits July 30, 2015 14:42
This tests whether internal accumulators are actually updated
when real jobs are run. Note that tests do not fully pass yet
because of a weird Java / Scala problem with TaskContext.
Apparently static Scala methods must be fully public for Java.
These tests verify whether the peak execution memory accumulator
is in fact updated by the respective operators. The ones covered
here include: GeneratedAggregate, ExternalSort,
UnsafeExternalSort, BroadcastHashJoin, BroadcastHashOuterJoin,
and BroadcastLeftSemiJoinHash.
Now we also run a dummy stage to ensure that the peak execution
memory accumulator does not already exist.
@andrewor14 andrewor14 changed the title [SPARK-8735] [WIP] [SQL] Expose memory usage for shuffles, joins and aggregations [SPARK-8735] [SQL] Expose memory usage for shuffles, joins and aggregations Jul 31, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we can change this from collect to match: collect will try to match the pattern anywhere in the query tree, whereas match will only match at the root of the query plan.

Andrew Or added 2 commits August 3, 2015 10:54
@JoshRosen
Copy link
Contributor

LGTM pending Jenkins.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1309 has started for PR 7770 at commit d7df332.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1308 has started for PR 7770 at commit d7df332.

…etrics

Conflicts:
	core/src/main/scala/org/apache/spark/TaskContext.scala
@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1307 has finished for PR 7770 at commit d7df332.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

…etrics

Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@squito
Copy link
Contributor

squito commented Aug 3, 2015

Accumulators handle task failures, but have super confusing semantics around recomputation from shared lineage, speculative execution, and stage retries. (eg., I don't understand the current logic for clearing these new values).

Making this an internal accumulator is certainly "safe" in that its not technically exposed at all, so we could change it without breaking any contract at all. I just meant that we seem to be headed to a state where (a) any user SparkListener won't have any access to this, other than just hardcoding a check for "peakExecutionMemory" into their code, which could break at any time. So a user spark listener "shouldn't" do that, but we do expose other UI info that way so its weird for this one tiny bit to be hidden. And I'm saying that in general, we seem to be headed to a future where a user SparkListener will get some set of metrics from the strongly typed and developer api TaskMetrics, and then they'll just snoop our internal accumulators for other stuff.

In any case, I guess I've said my piece and I'm not convincing anybody.

@pwendell
Copy link
Contributor

pwendell commented Aug 3, 2015

Hey Imran,

So I think there is a larger design discussion at hand that we can probably break out into a mailing list thread or maybe discuss offline - about as we add more metrics what internal mechanisms do we use and how to we present them to the user (these could be decoupled as concerns, actually). We had been going down the direction of moving some of these to accumulators to avoid some of the clunkiness of the taskmetrics approach and to avoid duplicate mechanism to the same things (we discussed along these lines even when doing updates to taskmetrics way back with @sryza).

However, an open question is how will this affect the user-facing story regarding consumption. For instance, we could have public constants for accumulator keys we plan to support and give them standard naming, if we want them to be caught by MIMA and more formally supported.

I would recommend forking a new thread on the dev list to discuss the trade-off's. This current patch as it stands, I don't think it represents a long term commitment in any direction. We could always add these to the TaskMetrics struct if we wanted to later on send them back to users in that way. We could also have all the TaskMetrics use accumulators internally and still support the current user-facing struct classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1312 has finished for PR 7770 at commit d7df332.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #39568 has finished for PR 7770 at commit f5b0d68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1315 has finished for PR 7770 at commit f5b0d68.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1311 has finished for PR 7770 at commit d7df332.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1310 has finished for PR 7770 at commit d7df332.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1313 has finished for PR 7770 at commit f5b0d68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1316 has finished for PR 7770 at commit f5b0d68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #39571 has finished for PR 7770 at commit 9abecb9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Now that this has passed tests as of the latest commit, I'm going to merge it to master and branch-1.5. Thanks!

asfgit pushed a commit that referenced this pull request Aug 3, 2015
…ations

This patch exposes the memory used by internal data structures on the SparkUI. This tracks memory used by all spilling operations and SQL operators backed by Tungsten, e.g. `BroadcastHashJoin`, `ExternalSort`, `GeneratedAggregate` etc. The metric exposed is "peak execution memory", which broadly refers to the peak in-memory sizes of each of these data structure.

A separate patch will extend this by linking the new information to the SQL operators themselves.

<img width="950" alt="screen shot 2015-07-29 at 7 43 17 pm" src="https://cloud.githubusercontent.com/assets/2133137/8974776/b90fc980-362a-11e5-9e2b-842da75b1641.png">
<img width="802" alt="screen shot 2015-07-29 at 7 43 05 pm" src="https://cloud.githubusercontent.com/assets/2133137/8974777/baa76492-362a-11e5-9b77-e364a6a6b64e.png">

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7770)
<!-- Reviewable:end -->

Author: Andrew Or <[email protected]>

Closes #7770 from andrewor14/expose-memory-metrics and squashes the following commits:

9abecb9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
f5b0d68 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
d7df332 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
8eefbc5 [Andrew Or] Fix non-failing tests
9de2a12 [Andrew Or] Fix tests due to another logical merge conflict
876bfa4 [Andrew Or] Fix failing test after logical merge conflict
361a359 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
40b4802 [Andrew Or] Fix style?
d0fef87 [Andrew Or] Fix tests?
b3b92f6 [Andrew Or] Address comments
0625d73 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
c00a197 [Andrew Or] Fix potential NPEs
10da1cd [Andrew Or] Fix compile
17f4c2d [Andrew Or] Fix compile?
a87b4d0 [Andrew Or] Fix compile?
d70874d [Andrew Or] Fix test compile + address comments
2840b7d [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
6aa2f7a [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
b889a68 [Andrew Or] Minor changes: comments, spacing, style
663a303 [Andrew Or] UnsafeShuffleWriter: update peak memory before close
d090a94 [Andrew Or] Fix style
2480d84 [Andrew Or] Expand test coverage
5f1235b [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
1ecf678 [Andrew Or] Minor changes: comments, style, unused imports
0b6926c [Andrew Or] Oops
111a05e [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
a7a39a5 [Andrew Or] Strengthen presence check for accumulator
a919eb7 [Andrew Or] Add tests for unsafe shuffle writer
23c845d [Andrew Or] Add tests for SQL operators
a757550 [Andrew Or] Address comments
b5c51c1 [Andrew Or] Re-enable test in JavaAPISuite
5107691 [Andrew Or] Add tests for internal accumulators
59231e4 [Andrew Or] Fix tests
9528d09 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
5b5e6f3 [Andrew Or] Add peak execution memory to summary table + tooltip
92b4b6b [Andrew Or] Display peak execution memory on the UI
eee5437 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
d9b9015 [Andrew Or] Track execution memory in unsafe shuffles
770ee54 [Andrew Or] Track execution memory in broadcast joins
9c605a4 [Andrew Or] Track execution memory in GeneratedAggregate
9e824f2 [Andrew Or] Add back execution memory tracking for *ExternalSort
4ef4cb1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics
e6c3e2f [Andrew Or] Move internal accumulators creation to Stage
a417592 [Andrew Or] Expose memory metrics in UnsafeExternalSorter
3c4f042 [Andrew Or] Track memory usage in ExternalAppendOnlyMap / ExternalSorter
bd7ab3f [Andrew Or] Add internal accumulators to TaskContext

(cherry picked from commit 702aa9d)
Signed-off-by: Josh Rosen <[email protected]>
@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1314 has finished for PR 7770 at commit f5b0d68.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 702aa9d Aug 3, 2015
@squito
Copy link
Contributor

squito commented Aug 3, 2015

Hey Patrick,

that sounds good. I agree that there are probably a couple different concerns here. My biggest concern was some overall direction we head on metrics ... as we've said we merging this is "safe" in the sense that it does let us change things later on without being tied to anything. Sorry if I sounded snarky -- just expressing my objection to this direction without making a big fuss if its too late to do anything about it.

@andrewor14 andrewor14 deleted the expose-memory-metrics branch August 3, 2015 21:27
@squito
Copy link
Contributor

squito commented Aug 3, 2015

Other concerns aside ... can somebody explain the logic for resetting the accumulator values to me?

IIUC you reset the values:

a) on the initial instance
b) if the RDD in the dependency has entirely been evicted from the cache
c) if a stage fails prior to any task completions (but any remaining tasks from that stage attempt may still increment accumulators, I'm not certain)
d) if a stage fails in a way that we believe (in one go) that all map output data has been lost

you do not reset the values if:

e) only some of parent RDD has been evicted from the cache
f) if the stage fails after we've already got some map output registered
g) a stage has lost all of its map output, but spark doesn't "realize" this until after a few rounds of stage attempts (which is not that crazy at all -- perhaps even more likely than (d), though I am saying this anecdotally)
h) a stage has lost 99% of its map output data, but there is still one partition hanging on for dear life.

Is my understanding wrong? Is this all intended behavior? or "good enough" behavior?


val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
info.accumulables
.find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we just take the first one but the largest?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants