Skip to content

Commit a4b1397

Browse files
committed
stage metric distributions
1 parent e48ba32 commit a4b1397

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMe
2424
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
2525
import org.apache.spark.ui.SparkUI
2626
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
27+
import org.apache.spark.util.Distribution
2728

2829
@Produces(Array(MediaType.APPLICATION_JSON))
2930
private[v1] class AllStagesResource(uiRoot: UIRoot) {
@@ -145,6 +146,84 @@ private[v1] object AllStagesResource {
145146
)
146147
}
147148

149+
def taskMetricDistributions(allTaskData: Seq[TaskUIData], quantiles: Array[Double]): TaskMetricDistributions = {
150+
151+
val rawMetrics = allTaskData.flatMap{_.taskMetrics}
152+
153+
def getMetric[T](data: Seq[T], f: T => Double): IndexedSeq[Double] =
154+
Distribution(data.map{d=> f(d)}).get.getQuantiles(quantiles)
155+
156+
abstract class MetricHelper[I,O](f: InternalTaskMetrics => Option[I]) {
157+
val data: Seq[I] = rawMetrics.flatMap{x => f(x)}
158+
def build: O
159+
def m(f: I => Double): IndexedSeq[Double] = getMetric(data, f)
160+
def metricOption: Option[O] = {
161+
if (data.isEmpty) {
162+
None
163+
} else {
164+
Some(build)
165+
}
166+
}
167+
}
168+
169+
def m(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
170+
getMetric(rawMetrics, f)
171+
172+
val inputMetrics =
173+
new MetricHelper[InternalInputMetrics, InputMetricDistributions](_.inputMetrics) {
174+
def build: InputMetricDistributions = new InputMetricDistributions(
175+
bytesRead = m(_.bytesRead),
176+
recordsRead = m(_.recordsRead)
177+
)
178+
}.metricOption
179+
180+
val outputMetrics =
181+
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](_.outputMetrics) {
182+
def build: OutputMetricDistributions = new OutputMetricDistributions(
183+
bytesWritten = m(_.bytesWritten),
184+
recordsWritten = m(_.recordsWritten)
185+
)
186+
}.metricOption
187+
188+
val shuffleReadMetrics =
189+
new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](_.shuffleReadMetrics) {
190+
def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
191+
readBytes = m(_.totalBytesRead),
192+
readRecords = m(_.recordsRead),
193+
remoteBytesRead = m(_.remoteBytesRead),
194+
remoteBlocksFetched = m(_.remoteBlocksFetched),
195+
localBlocksFetched = m(_.localBlocksFetched),
196+
totalBlocksFetched = m(_.totalBlocksFetched),
197+
fetchWaitTime = m(_.fetchWaitTime)
198+
)
199+
}.metricOption
200+
201+
val shuffleWriteMetrics =
202+
new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](_.shuffleWriteMetrics) {
203+
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
204+
writeBytes = m(_.shuffleBytesWritten),
205+
writeRecords = m(_.shuffleRecordsWritten),
206+
writeTime = m(_.shuffleWriteTime)
207+
)
208+
}.metricOption
209+
210+
211+
new TaskMetricDistributions(
212+
quantiles = quantiles,
213+
executorDeserializeTime = m(_.executorDeserializeTime),
214+
executorRunTime = m(_.executorRunTime),
215+
resultSize = m(_.resultSize),
216+
jvmGcTime = m(_.jvmGCTime),
217+
resultSerializationTime = m(_.resultSerializationTime),
218+
memoryBytesSpilled = m(_.memoryBytesSpilled),
219+
diskBytesSpilled = m(_.diskBytesSpilled),
220+
inputMetrics = inputMetrics,
221+
outputMetrics = outputMetrics,
222+
shuffleReadMetrics = shuffleReadMetrics,
223+
shuffleWriteMetrics = shuffleWriteMetrics
224+
)
225+
}
226+
148227
def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
149228
new AccumulableInfo(acc.id, acc.name, acc.update, acc.value)
150229
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,50 @@ class ShuffleWriteMetrics(
194194
val recordsWritten: Long
195195
)
196196

197+
class TaskMetricDistributions(
198+
val quantiles: IndexedSeq[Double],
199+
200+
val executorDeserializeTime: IndexedSeq[Double],
201+
val executorRunTime: IndexedSeq[Double],
202+
val resultSize: IndexedSeq[Double],
203+
val jvmGcTime: IndexedSeq[Double],
204+
val resultSerializationTime: IndexedSeq[Double],
205+
val memoryBytesSpilled: IndexedSeq[Double],
206+
val diskBytesSpilled: IndexedSeq[Double],
207+
208+
val inputMetrics: Option[InputMetricDistributions],
209+
val outputMetrics: Option[OutputMetricDistributions],
210+
val shuffleReadMetrics: Option[ShuffleReadMetricDistributions],
211+
val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]
212+
)
213+
214+
class InputMetricDistributions(
215+
val bytesRead: IndexedSeq[Double],
216+
val recordsRead: IndexedSeq[Double]
217+
)
218+
219+
class OutputMetricDistributions(
220+
val bytesWritten: IndexedSeq[Double],
221+
val recordsWritten: IndexedSeq[Double]
222+
)
223+
224+
225+
class ShuffleReadMetricDistributions(
226+
val readBytes: IndexedSeq[Double],
227+
val readRecords: IndexedSeq[Double],
228+
val remoteBlocksFetched: IndexedSeq[Double],
229+
val localBlocksFetched: IndexedSeq[Double],
230+
val fetchWaitTime: IndexedSeq[Double],
231+
val remoteBytesRead: IndexedSeq[Double],
232+
val totalBlocksFetched: IndexedSeq[Double]
233+
)
234+
235+
class ShuffleWriteMetricDistributions(
236+
val writeBytes: IndexedSeq[Double],
237+
val writeRecords: IndexedSeq[Double],
238+
val writeTime: IndexedSeq[Double]
239+
)
240+
197241
class AccumulableInfo (
198242
val id: Long,
199243
val name: String,

0 commit comments

Comments
 (0)