Skip to content

Conversation

@ash211
Copy link
Contributor

@ash211 ash211 commented Mar 23, 2017

Registers the shuffle server's metrics with the Hadoop Node Manager's
DefaultMetricsSystem.

What changes were proposed in this pull request?

Expose the shuffle service metrics not only on the ExternalShuffleService as done in SPARK-16405, but also in the YarnShuffleService. Because the YARN Node Manager's metrics system is the Hadoop-internal one and not the Dropwizard Metrics system that Spark uses, this requires a conversion from DW metrics to Hadoop metrics.

How was this patch tested?

Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint at /jmx.

Resulting metrics look like this:

[user@host ~]$ curl -sk -XGET https://`hostname -f`:8042/jmx | jq . | grep 'shuffleservice' -B 1 -A 18
    {
      "name": "Hadoop:service=NodeManager,name=shuffleservice",
      "modelerType": "shuffleservice",
      "tag.Hostname": "<redacted>",
      "openBlockRequestLatencyMillis_count": 1,
      "openBlockRequestLatencyMillis_rate15": 0.0011080303990206543,
      "openBlockRequestLatencyMillis_rate5": 0.0033057092356765017,
      "openBlockRequestLatencyMillis_rate1": 0.015991117074135343,
      "openBlockRequestLatencyMillis_rateMean": 0.003843993699021382,
      "blockTransferRateBytes_count": 118,
      "blockTransferRateBytes_rate15": 0.1307475870844372,
      "blockTransferRateBytes_rate5": 0.39007368980982715,
      "blockTransferRateBytes_rate1": 1.8869518147479705,
      "blockTransferRateBytes_rateMean": 0.45359183094454836,
      "registeredExecutorsSize": 2,
      "registerExecutorRequestLatencyMillis_count": 2,
      "registerExecutorRequestLatencyMillis_rate15": 0.001697343764758814,
      "registerExecutorRequestLatencyMillis_rate5": 0.002970701813078509,
      "registerExecutorRequestLatencyMillis_rate1": 0.0005857750515146702,
      "registerExecutorRequestLatencyMillis_rateMean": 0.007687995987242345
    },
[user@host ~]$

Registers the shuffle server's metrics with the Hadoop Node Manager's
DefaultMetricsSystem.
@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75115 has finished for PR 17401 at commit 4caf4d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class YarnShuffleServiceMetrics implements MetricsSource

@jerryshao
Copy link
Contributor

Actually we follow two space indent for java code in Spark, would you please change the format?

@ash211
Copy link
Contributor Author

ash211 commented Mar 24, 2017

Thanks for taking a look @jerryshao ! I've reformatted to two-space indentation and run ./dev/lint-java to make sure this code passes the linter.

src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java still has some errors but those are separate from this change

* @param all if true, return all metrics even if unchanged.
*/
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a unit test to verify the correctness of converting codahale metrics to Hadoop metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be able to, I'm working on creating one now. By correctness, I think you mostly mean that the values passed through are the same, even though the naming schemes are different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. From my understanding the correctness means Gauge still coverts to Gauge, Meter still to Meter, not sure can it be guaranteed?

Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource",
String.class, String.class, MetricsSource.class);
registerSourceMethod.setAccessible(true);
registerSourceMethod.invoke(metricsSystem, "shuffleservice", "Metrics on the Spark " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. "shuffleService" camel case might be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change shortly

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75150 has finished for PR 17401 at commit 13aa4ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor Author

ash211 commented Mar 24, 2017

Thanks again for the comments @jerryshao ! I've now added some tests to verify that the metrics get converted in the expected way to the collector, and camel-cased shuffleService

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75157 has finished for PR 17401 at commit 9992c10.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75165 has finished for PR 17401 at commit 96a0882.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor Author

ash211 commented Mar 24, 2017

Ready for further review.

}

@VisibleForTesting
public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use static here? Looks like here it is it is only for the test convenience.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use static here to make it clear that the method does not need to be run in the context of an instance. This prevents it from accidentally accessing instance variables when I don't intend it to

import org.mockito.Matchers._
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.Matchers
import scala.collection.JavaConverters._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import ordering is not correct, Scala package should be before the third party packages.

MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();

Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource",
String.class, String.class, MetricsSource.class);
Copy link
Contributor

@jerryshao jerryshao Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, here I think should be 2 space indent follow the above line.

String.class, String.class, MetricsSource.class);
registerSourceMethod.setAccessible(true);
registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " +
"Shuffle Service", serviceMetrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here.

@SparkQA
Copy link

SparkQA commented Mar 28, 2017

Test build #75322 has finished for PR 17401 at commit 7c7d6d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor Author

ash211 commented Mar 28, 2017

@jerryshao ready for re-review

Copy link
Contributor

@jerryshao jerryshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, just some style related comments. We should ping others to take a review.

* A simple class to wrap all shuffle service wrapper metrics
*/
private class ShuffleMetrics implements MetricSet {
@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VisibleForTesting causes classpath issues. Please note this in the java doc instead (SPARK-11615).

This is a scalastyle output, would be better to remove this annotation.

String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this 2 space indent looks like not necessary.

}
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here.

Gauge m = (Gauge) metric;
Object gaugeValue = m.getValue();
if (gaugeValue instanceof Integer) {
Integer intValue = (Integer) gaugeValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that we could only handle integer Gauge, what if later on we add different metric in ExternalShuffleBlockHandler? Looks like we should also manually handle the case one by one here. At least we should an else branch for the fallback check.

@HyukjinKwon
Copy link
Member

gentle ping @ash211. I just wonder if it is active now.

@asfgit asfgit closed this in 3a45c7f Aug 5, 2017
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This PR is follow-up of closed  apache#17401 which only ended due to of inactivity, but its still nice feature to have.
Given review by jerryshao taken in consideration and edited:
- VisibleForTesting deleted because of dependency conflicts
- removed unnecessary reflection for `MetricsSystemImpl`
- added more available types for gauge

## How was this patch tested?

Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint running on `hostname:port`

Resulting metrics look like this:
```
curl -sk -XGET hostname:port |  grep -v '#' | grep 'shuffleService'
hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 0.31428910657834713
hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 566144.9983653595
hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 2464409.9678099006
hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 1.2893844732240272
hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0
hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",} 1.255574678369966
hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 315.0
hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 0.7661929192569739
hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",} 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",} 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",} 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",} 0.0
hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 6.18271213E8
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",} 0.0
hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 1154114.4881816586
hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 574745.0749848988
```

Closes apache#22485 from mareksimunek/SPARK-18364.

Lead-authored-by: marek.simunek <[email protected]>
Co-authored-by: Andrew Ash <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
prakharjain09 pushed a commit to prakharjain09/spark that referenced this pull request Nov 29, 2019
## What changes were proposed in this pull request?

This PR is follow-up of closed  apache#17401 which only ended due to of inactivity, but its still nice feature to have.
Given review by jerryshao taken in consideration and edited:
- VisibleForTesting deleted because of dependency conflicts
- removed unnecessary reflection for `MetricsSystemImpl`
- added more available types for gauge

## How was this patch tested?

Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint running on `hostname:port`

Resulting metrics look like this:
```
curl -sk -XGET hostname:port |  grep -v '#' | grep 'shuffleService'
hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 0.31428910657834713
hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 566144.9983653595
hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 2464409.9678099006
hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 1.2893844732240272
hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0
hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",} 1.255574678369966
hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 315.0
hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 0.7661929192569739
hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",} 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",} 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",} 0.0
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",} 0.0
hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 6.18271213E8
hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",} 0.0
hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 1154114.4881816586
hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 574745.0749848988
```

Closes apache#22485 from mareksimunek/SPARK-18364.

Lead-authored-by: marek.simunek <[email protected]>
Co-authored-by: Andrew Ash <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit a802c69)
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
## What changes were proposed in this pull request?

This PR proposes to close stale PRs, mostly the same instances with apache#18017

Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory …
Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage.
Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation
Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers
Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key…
Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples
Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python
Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage
Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins
Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP]
Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job
Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable
Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator
Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset
Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns
Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work
Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly
Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column
Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService
Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication
Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone
Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000)
Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos
Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table
Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit…
Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex
Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable
Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting
Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages
Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery
Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException
Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer
Closes apache#18585 - SPARK-21359
Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala

Added:
Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I…
Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0
Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to …
Closes apache#18667 - Fix the simpleString used in error messages
Closes apache#18782 - Branch 2.1

Added:
Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads

Added:
Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread
Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable
Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server

Added:
Closes apache#18827 - Merge pull request 1 from apache/master

## How was this patch tested?

N/A

Author: hyukjinkwon <[email protected]>

Closes apache#18780 from HyukjinKwon/close-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants