-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19447] Make Range operator generate "recordsRead" metric #16960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
| } | ||
|
|
||
| def run(df: DataFrame): List[(Long, Long, Long)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document what hte long long long are for?
| stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] | ||
| } | ||
|
|
||
| def getResults(): List[(Long, Long, Long)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too long long long
|
cc @hvanhovell if you have a min to review this ... |
|
Test build #73004 has finished for PR 16960 at commit
|
|
Test build #73005 has finished for PR 16960 at commit
|
|
|
||
| object InputOutputMetricsHelper { | ||
| private class InputOutputMetricsListener extends SparkListener { | ||
| private case class MetricsResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: add space
| var shuffleRecordsRead: Long = 0L, | ||
| var sumMaxOutputRows: Long = 0L) | ||
|
|
||
| private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this val.
| def run(df: DataFrame): List[(Long, Long, Long)] = { | ||
| val spark = df.sparkSession | ||
| val sparkContext = spark.sparkContext | ||
| val listener = new InputOutputMetricsListener() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use try...finally here
| } | ||
|
|
||
| override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { | ||
| val res = stageIdToMetricsResult.getOrElseUpdate(taskEnd.stageId, { MetricsResult() }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit remove curly braces
| spark.read.parquet(dir).createOrReplaceTempView("pqS") | ||
|
|
||
| val res3 = InputOutputMetricsHelper.run( | ||
| spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hard to reason about. Could you add a few lines of documentation?
|
LGTM - pending jenkins. |
|
Test build #73057 has finished for PR 16960 at commit
|
|
Merging in master. |
The Range was modified to produce "recordsRead" metric instead of "generated rows". The tests were updated and partially moved to SQLMetricsSuite. Unit tests. Author: Ala Luszczak <[email protected]> Closes apache#16960 from ala/range-records-read. (cherry picked from commit b486ffc) Signed-off-by: Reynold Xin <[email protected]>
|
I think that the commit has left numGeneratedRows metrics off, hasn't it? (it was added in #16829) |
|
True. There's a couple of lines that should be removed with this change, that were left behind. numGeneratedRows should be gone. |
|
I'll have a look at this this week and send a PR unless you beat me to it :) Thanks @ala! |
|
Thanks @jaceklaskowski - it's already done: #17939 |
What changes were proposed in this pull request?
The Range was modified to produce "recordsRead" metric instead of "generated rows". The tests were updated and partially moved to SQLMetricsSuite.
How was this patch tested?
Unit tests.