Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3f8321a
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
cd16a75
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
94c2b04
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
062f5d7
Update JsonProtocolSuite
Sep 25, 2018
245221d
[SPARK-24958] Add executors' process tree total memory information to…
Oct 2, 2018
c72be03
Adressing most of Imran's comments
Oct 3, 2018
8f3c938
Fixing the scala style and some minor comments
Oct 3, 2018
f2dca27
Removing types from the definitions where ever possible
Oct 4, 2018
a9f924c
Using Utils methods when possible or use ProcessBuilder
Oct 5, 2018
a11e3a2
make use of Utils.trywithresources
Oct 5, 2018
34ad625
Changing ExecutorMericType and ExecutorMetrics to use a map instead o…
Oct 9, 2018
415f976
Changing ExecutorMetric to use array instead of a map
Oct 10, 2018
067b81d
A small cosmetic change
Oct 10, 2018
18ee4ad
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Oct 17, 2018
7f7ed2b
Applying latest review commments. Using Arrays instead of Map for ret…
Oct 23, 2018
f3867ff
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Nov 5, 2018
0f8f3e2
Fix an issue with jsonProtoclSuite
Nov 5, 2018
ea08c61
Fix scalastyle issue
Nov 5, 2018
8f20857
Applying latest review comments
Nov 14, 2018
6e65360
Using the companion object and other stuff
Nov 27, 2018
4659f4a
Update the use of process builder and applying other review comments
Nov 28, 2018
ef4be38
Small style fixes based on reviews
Nov 30, 2018
805741c
Applying review comments, mostly style related
Nov 30, 2018
4c1f073
emove the unnecessary trywithresources
Nov 30, 2018
0a7402e
Applying the comment about error handling and some more style fixes
Dec 4, 2018
3d65b35
Removing a return
Dec 6, 2018
6eab315
Reordering of info in a test resource file to avoid confusion
Dec 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ private[spark] class Heartbeater(
}

/**
* Get the current executor level metrics. These are returned as an array
* Get the current executor level metrics. These are returned as an array, with the index
* determined by ExecutorMetricType.metricToOffset
*/
def getCurrentMetrics(): ExecutorMetrics = {

val metrics = new Array[Long](ExecutorMetricType.numMetrics)
var offset = 0
ExecutorMetricType.metricGetters.foreach { metric =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.metrics.ExecutorMetricType
*/
@DeveloperApi
class ExecutorMetrics private[spark] extends Serializable {

// Metrics are indexed by ExecutorMetricType.metricToOffset
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
// the first element is initialized to -1, indicating that the values for the array
// haven't been set yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,22 @@ private[spark] class ProcfsMetricsGetter(
}
val stdoutThread = Utils.processStreamByLine("read stdout for pgrep",
process.getInputStream, appendChildPid)
val error = process.getErrorStream
var errorString = ""
(0 until error.available()).foreach { i =>
errorString += error.read()
}
val errorStringBuilder = new StringBuilder()
val stdErrThread = Utils.processStreamByLine(
"stderr for pgrep",
process.getErrorStream,
{ line =>
errorStringBuilder.append(line)
})
val exitCode = process.waitFor()
stdoutThread.join()
stdErrThread.join()
val errorString = errorStringBuilder.toString()
// pgrep will have exit code of 1 if there are more than one child process
// and it will have a exit code of 2 if there is no child process
if (exitCode != 0 && exitCode > 2) {
val cmd = builder.command().toArray.mkString(" ")
logWarning(s"Process $cmd" +
s" exited with code $exitCode, with stderr:" + s"${errorString} ")
logWarning(s"Process $cmd exited with code $exitCode and stderr: $errorString")
throw new SparkException(s"Process $cmd exited with code $exitCode")
}
childPidsInInt
Expand All @@ -165,43 +168,37 @@ private[spark] class ProcfsMetricsGetter(

def addProcfsMetricsFromOneProcess(
allMetrics: ProcfsMetrics,
pid: Int):
ProcfsMetrics = {
pid: Int): ProcfsMetrics = {

// Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
// info. I tried that but found it not correct during tests, so I used normal string analysis
// instead. The computation of RSS and Vmem are based on proc(5):
// The computation of RSS and Vmem are based on proc(5):
// http://man7.org/linux/man-pages/man5/proc.5.html
try {
val pidDir = new File(procfsDir, pid.toString)
Utils.tryWithResource( new InputStreamReader(
Utils.tryWithResource(new InputStreamReader(
new FileInputStream(
new File(pidDir, procfsStatFile)), Charset.forName("UTF-8"))) { fReader =>
Utils.tryWithResource( new BufferedReader(fReader)) { in =>
val procInfo = in.readLine
val procInfoSplit = procInfo.split(" ")
if (procInfoSplit != null) {
val vmem = procInfoSplit(22).toLong
val rssPages = procInfoSplit(23).toLong
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
return allMetrics.copy(
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssPages*pageSize)
)
}
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
return allMetrics.copy(
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssPages*pageSize)
)
}
return allMetrics.copy(
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
otherRSSTotal = allMetrics.otherRSSTotal + (rssPages*pageSize)
)
val vmem = procInfoSplit(22).toLong
val rssMem = procInfoSplit(23).toLong * pageSize
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
allMetrics.copy(
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
)
}
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
allMetrics.copy(
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
)
}
else {
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
allMetrics.copy(
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ private[spark] object ExecutorMetricType {
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
var metricInSet = 0
while (metricInSet < m.names.length) {
definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + numberOfMetrics))
metricInSet += 1
(0 until m.names.length).foreach { idx =>
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
}
numberOfMetrics += m.names.length
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.SparkFunSuite

class ProcfsMetricsGetterSuite extends SparkFunSuite {

val p = new ProcfsMetricsGetter(getTestResourcePath("ProcessTree"), 4096L)
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"), 4096L)

test("testGetProcessInfo") {
var r = ProcfsMetrics(0, 0, 0, 0, 0, 0)
Expand Down