Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite Spark fetcher/heuristics. #162

Merged
merged 13 commits into from
Dec 13, 2016

Conversation

rayortigas
Copy link
Contributor

The purpose of this update is to:

  • rewrite the Spark data fetcher to use Spark event logs minimally, since it can be expensive to download and process these fully as done before
  • rewrite the Spark data fetcher to use the Spark monitoring REST API, which provides almost all of the information Spark heuristics need
  • update the Spark heuristics to provide hopefully more useful information and avoid being arbitrarily restrictive

The new Spark-related code is provided in parallel to the old Spark-related code. To enable it:

  • Uncomment and swap in the appropriate fragments in AggregatorConf.xml, FetcherConf.xml, and HeuristicConf.xml.
  • Set SPARK_CONF_DIR (or SPARK_HOME) to an appropriate location so that Dr. Elephant can find spark-defaults.conf.

Heuristics added:

  • "Executor shuffle read bytes distribution": We now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
  • "Executor shuffle write bytes distribution": We now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.

Heuristics changed:

  • "Average input size" -> "Executor input bytes distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
  • "Average peak storage memory" -> "Executor storage memory used distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
  • "Average runtime" -> "Executor task time distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
  • "Memory utilization rate" -> "Executor storage memory utilization rate": This seemed to imply total memory but it is just the utilization rate for storage memory, so has been relabeled to indicate that. Shuffle memory is important too (but we don't seem to have access to shuffle memory utilization metrics).
  • "Total memory used at peak" -> "Total executor storage memory used": This also refers to storage memory. It has been relabeled to indicate that.
  • "Spark problematic stages" -> ("Spark stages with high task failure rates", "Spark stages with long average executor runtimes"): This was a combination of stages with high task failure rates and those with long runtimes. Those have been separated.

Heuristics removed:

  • spark.executor.cores: I think this is somewhat discretionary. At the very least, our internal recommendation stopped matching the one in Dr. Elephant.
  • spark.shuffle.manager: This was changed to "sort" by default as of Spark 1.2, so there is no current use for checking this setting.
  • "Average output size": Metrics related to output size appear to be deprecated or non-existent, so there is no current use for checking this setting.

Finally, overall waste metrics are calculated based on allocation [app runtime * # of executors * executor memory] vs. usage [total executor run time * executor memory]. They were previously calculated based only on storage memory and some 50% buffer, which I didn't understand.

Added unit tests and also tested against our internal cluster as much as practically I could. Will need help to fully validate.

@rayortigas
Copy link
Contributor Author

I don't know how to make the JDK6 build pass https://travis-ci.org/linkedin/dr-elephant/jobs/176240296.

If I downgrade org.glassfish.jersey from 2.24 to 2.6, the last Java 6-compatible version, I get another set of problems with trying to use com.sun.jersey for some reason.

If we want this PR to happen, we need to move forward and stop supporting Java 6.

Copy link
Contributor

@shankar37 shankar37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to review more. But looks good overall

val resourcesAllocatedMBSeconds =
aggregateResourcesAllocatedMBSeconds(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesUsedMBSeconds = aggregateResourcesUsedMBSeconds(executorMemoryBytes, totalExecutorTaskTimeMillis)
val resourcesWastedMBSeconds = resourcesAllocatedMBSeconds - resourcesUsedMBSeconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple resourceUsed by 1.5(configurable) to give 50% buffer. the reason for 50% buffer is because it will be impossible for users to use all allocated resources. But if they use less than 2/3rd of the allocated resources, then we flag it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IS this fixed ? I dont see where the 1.5x factor is used.

The purpose of this update is to:

- rewrite the Spark data fetcher to use Spark event logs minimally, since it can be expensive to download and process these fully as done before
- rewrite the Spark data fetcher to use the [Spark monitoring REST API](https://spark.apache.org/docs/1.4.1/monitoring.html#rest-api), which provides almost all of the information Spark heuristics need
- update the Spark heuristics to provide hopefully more useful information and avoid being arbitrarily restrictive

The new Spark-related code is provided in parallel to the old Spark-related code. To enable it:

- Uncomment and swap in the appropriate fragments in `AggregatorConf.xml`, `FetcherConf.xml`, and `HeuristicConf.xml`.
- Set `SPARK_CONF_DIR` (or `SPARK_HOME`) to an appropriate location so that Dr. Elephant can find `spark-defaults.conf`.

Heuristics added:

- "Executor shuffle read bytes distribution": We now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Executor shuffle write bytes distribution": We now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.

Heuristics changed:

- "Average input size" -> "Executor input bytes distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Average peak storage memory" -> "Executor storage memory used distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Average runtime" -> "Executor task time distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Memory utilization rate" -> "Executor storage memory utilization rate": This seemed to imply total memory but it is just the utilization rate for storage memory, so has been relabeled to indicate that. Shuffle memory is important too (but we don't seem to have access to shuffle memory utilization metrics).
- "Total memory used at peak" -> "Total executor storage memory used": This also refers to storage memory. It has been relabeled to indicate that.
- "Spark problematic stages" -> ("Spark stages with high task failure rates", "Spark stages with long average executor runtimes"): This was a combination of stages with high task failure rates and those with long runtimes. Those have been separated.

Heuristics removed:

- spark.executor.cores: I think this is somewhat discretionary. At the very least, our internal recommendation stopped matching the one in Dr. Elephant.
- spark.shuffle.manager: This was changed to "sort" by default as of Spark 1.2, so there is no current use for checking this setting.
- "Average output size": Metrics related to output size appear to be deprecated or non-existent, so there is no current use for checking this setting.

Finally, overall waste metrics are calculated based on allocation [app runtime * # of executors * executor memory] vs. usage [total executor run time * executor memory]. They were previously calculated based only on storage memory and some 50% buffer, which I didn't understand.

Added unit tests and also tested against our internal cluster as much as practically I could. Will need help to fully validate.
SBT 0.13.0 doesn't pull in the correct transitive dependencies from Jersey <sbt/sbt#847>.
@akshayrai
Copy link
Contributor

@shankar37, do you have anything to add? Can we push this?

@rayortigas, could you fix the conflicts post which I think we can ship it.

Copy link
Contributor

@shankar37 shankar37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to look at the heuristic classes.

*/
case class SeverityThresholds(low: Number, moderate: Number, severe: Number, critical: Number, ascending: Boolean) {
if (ascending) {
require(low.doubleValue <= moderate.doubleValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use < instead . I can't think of a scenario where it will be equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code used equal numbers for severe and critical for some thresholds, e.g.

private double[] avgJobFailureLimits = {0.1d, 0.3d, 0.5d, 0.5d}; // The avg job failure rate
. Also,
public static Severity getSeverityAscending(Number value, Number low, Number moderate, Number severe,
, for example, will check for equal values.

/**
* A heuristic based on metrics for a Spark app's jobs.
*
* This heuristic reports job failures and high task failure rates for each job.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about stages ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val resourcesAllocatedMBSeconds =
aggregateResourcesAllocatedMBSeconds(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesUsedMBSeconds = aggregateResourcesUsedMBSeconds(executorMemoryBytes, totalExecutorTaskTimeMillis)
val resourcesWastedMBSeconds = resourcesAllocatedMBSeconds - resourcesUsedMBSeconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IS this fixed ? I dont see where the 1.5x factor is used.

import com.linkedin.drelephant.analysis.{ApplicationType, HadoopApplicationData}


case class SparkComboApplicationData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it just be called SparkApplicationData ? That its a combo of log and rest derived data is an implementation detail, no ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that detail is not hidden. Can we not expose that to the caller ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkApplicationData already exists as an interface for the old fetcher to use: https://github.com/linkedin/dr-elephant/blob/dad905cdafa6aa7665059917a455cb601155fbd1/app/com/linkedin/drelephant/spark/data/SparkApplicationData.java. I didn't want to use this interface, so to preserve backwards compatibility for now I contrived SparkComboApplicationData.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my point is two fold

  1. The name has to be changed to hide the fact that its "Combo"
  2. The inferface seems to expose the fact that it's made of rest and log derived data. I want to hide that.

Can you also elaborate on why you couldn't use the existing interface of SparkApplicationData ? Is it because the data and its structure has changed significantly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the data and its structure changed significantly enough to warrant not using SparkApplicationData.

Under normal circumstances I wouldn't use the Combo qualifier at all; I would delete the existing Spark stuff and just make this class the new SparkApplicationData. However, seeing that the new code is to be treated as experimental, I need some qualifier. It doesn't seem like a big deal to temporarily identify the implementation and then rename it when we are comfortable removing experimental status. If you have another suggestion, let me know.

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.v1.StageStatus

class ApplicationInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these not case classes ? Wouldn't that be more appropriate ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, one of these classes has more than 22 fields, so it can't be converted to a case class anyway, at least for Scala 2.10.

* License for the specific language governing permissions and limitations under
* the License.
*/
package com.linkedin.drelephant.spark.fetchers.statusapiv1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it named statusapiv1? Does 1 here indicate spark version or something else ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try(getProperty(SPARK_DRIVER_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)

lazy val executorMemoryBytes: Option[Long] =
Try(getProperty(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:spacing. seems like a new line is added for no reason


val resourcesWastedMBSeconds =
((BigDecimal(resourcesAllocatedMBSeconds) * (1.0 - allocatedMemoryWasteBufferPercentage)) - BigDecimal(resourcesUsedMBSeconds))
.toBigInt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shankar37 I restored the waste buffer.

The previous calculation was based on discounting 1/3 of storage (cache) memory, whereas the new calculation is based on discounting 1/2 of all memory, storage and shuffle, since from observation it seemed like a bigger number would be appropriate.

I think I said before, I have some reservations about how this number is even derived. I don't have any better ideas though, so I'm willing to commit here.

@rayortigas
Copy link
Contributor Author

FYI, I had to push force on this PR since I had to rebase and resolve conflicts.

@rayortigas
Copy link
Contributor Author

CI passed for Java 8 and 7, but as before, it is not working for Java 6.

@shankar37
Copy link
Contributor

Looks good to me except for the SparkComboApplicationData issue.

@rayortigas
Copy link
Contributor Author

OK, I just discussed with @shankar37 and he is fine with me deleting the old Spark fetcher/heuristics altogether. If there are any problems with this new code, we'll fix it going forward.

Deleting the old code should fix my naming issue. @shankar37 also had a separate point about the data class exposing implementation, which I understand now. I'll refactor and clean it up.

@rayortigas
Copy link
Contributor Author

@shankar37 I updated the PR, you can see the 5 commits added today: https://github.com/linkedin/dr-elephant/pull/162/commits

@rayortigas
Copy link
Contributor Author

I found a small bug with the latest change while testing against our cluster, will update this PR today or tomorrow.

@rayortigas
Copy link
Contributor Author

Did some testing against our cluster, I'm fine from my end now.

@akshayrai akshayrai merged commit 28f4025 into linkedin:master Dec 13, 2016
@rayortigas
Copy link
Contributor Author

Thanks for all your help @shankar37 and @akshayrai, and for shepherding this through!

akshayrai pushed a commit that referenced this pull request Feb 22, 2017
shkhrgpt added a commit to shkhrgpt/dr-elephant that referenced this pull request Feb 28, 2017
* Fix linkedin#162 with the right calculation for resourceswasted and  add missing workflow links (linkedin#207)

* Fix Exception thrown when JAVA_EXTRA_OPTIONS is not present (linkedin#210)
shkhrgpt added a commit to shkhrgpt/dr-elephant that referenced this pull request Mar 8, 2017
* Fix linkedin#162 with the right calculation for resourceswasted and  add missing workflow links (linkedin#207)

* Fix Exception thrown when JAVA_EXTRA_OPTIONS is not present (linkedin#210)

* Adds an option to fetch recently finished apps from RM (linkedin#212)

* Fixes issue caused by http in history server config property (linkedin#217)

* add config for timezone of job history server (linkedin#214)

* Include reference to the weekly meeting
skakker pushed a commit to skakker/dr-elephant that referenced this pull request Dec 14, 2017
* Rewrite Spark fetcher/heuristics.

The purpose of this update is to:

- rewrite the Spark data fetcher to use Spark event logs minimally, since it can be expensive to download and process these fully as done before
- rewrite the Spark data fetcher to use the [Spark monitoring REST API](https://spark.apache.org/docs/1.4.1/monitoring.html#rest-api), which provides almost all of the information Spark heuristics need
- update the Spark heuristics to provide hopefully more useful information and avoid being arbitrarily restrictive

The new Spark-related code is provided in parallel to the old Spark-related code. To enable it:

- Uncomment and swap in the appropriate fragments in `AggregatorConf.xml`, `FetcherConf.xml`, and `HeuristicConf.xml`.
- Set `SPARK_CONF_DIR` (or `SPARK_HOME`) to an appropriate location so that Dr. Elephant can find `spark-defaults.conf`.

Heuristics added:

- "Executor shuffle read bytes distribution": We now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Executor shuffle write bytes distribution": We now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.

Heuristics changed:

- "Average input size" -> "Executor input bytes distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Average peak storage memory" -> "Executor storage memory used distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Average runtime" -> "Executor task time distribution": Instead of providing an average along with min/max, we now provide a distribution with min/25p/median/75p/max, with severity based on a max-to-median ratio.
- "Memory utilization rate" -> "Executor storage memory utilization rate": This seemed to imply total memory but it is just the utilization rate for storage memory, so has been relabeled to indicate that. Shuffle memory is important too (but we don't seem to have access to shuffle memory utilization metrics).
- "Total memory used at peak" -> "Total executor storage memory used": This also refers to storage memory. It has been relabeled to indicate that.
- "Spark problematic stages" -> ("Spark stages with high task failure rates", "Spark stages with long average executor runtimes"): This was a combination of stages with high task failure rates and those with long runtimes. Those have been separated.

Heuristics removed:

- spark.executor.cores: I think this is somewhat discretionary. At the very least, our internal recommendation stopped matching the one in Dr. Elephant.
- spark.shuffle.manager: This was changed to "sort" by default as of Spark 1.2, so there is no current use for checking this setting.
- "Average output size": Metrics related to output size appear to be deprecated or non-existent, so there is no current use for checking this setting.

Finally, overall waste metrics are calculated based on allocation [app runtime * # of executors * executor memory] vs. usage [total executor run time * executor memory]. They were previously calculated based only on storage memory and some 50% buffer, which I didn't understand.

Added unit tests and also tested against our internal cluster as much as practically I could. Will need help to fully validate.
skakker pushed a commit to skakker/dr-elephant that referenced this pull request Dec 14, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants