Skip to content

Commit 194b5bf

Browse files
committed
Rerun plan stability suite
2 parents 8f15d15 + 280a2f3 commit 194b5bf

File tree

580 files changed

+41845
-20245
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

580 files changed

+41845
-20245
lines changed

.github/workflows/build_and_test.yml

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ jobs:
327327
run: |
328328
apt-get install -y libcurl4-openssl-dev libgit2-dev libssl-dev libxml2-dev
329329
Rscript -e "install.packages(c('devtools'), repos='https://cloud.r-project.org/')"
330-
Rscript -e "devtools::install_github('jimhester/[email protected].0')"
330+
Rscript -e "devtools::install_github('jimhester/[email protected].1')"
331331
./R/install-dev.sh
332332
- name: Install dependencies for documentation generation
333333
run: |
@@ -367,6 +367,17 @@ jobs:
367367
steps:
368368
- name: Checkout Spark repository
369369
uses: actions/checkout@v2
370+
- name: Cache Scala, SBT and Maven
371+
uses: actions/cache@v2
372+
with:
373+
path: |
374+
build/apache-maven-*
375+
build/scala-*
376+
build/*.jar
377+
~/.sbt
378+
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
379+
restore-keys: |
380+
build-
370381
- name: Cache Maven local repository
371382
uses: actions/cache@v2
372383
with:
@@ -392,6 +403,17 @@ jobs:
392403
steps:
393404
- name: Checkout Spark repository
394405
uses: actions/checkout@v2
406+
- name: Cache Scala, SBT and Maven
407+
uses: actions/cache@v2
408+
with:
409+
path: |
410+
build/apache-maven-*
411+
build/scala-*
412+
build/*.jar
413+
~/.sbt
414+
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
415+
restore-keys: |
416+
build-
395417
- name: Cache Coursier local repository
396418
uses: actions/cache@v2
397419
with:
@@ -414,6 +436,17 @@ jobs:
414436
steps:
415437
- name: Checkout Spark repository
416438
uses: actions/checkout@v2
439+
- name: Cache Scala, SBT and Maven
440+
uses: actions/cache@v2
441+
with:
442+
path: |
443+
build/apache-maven-*
444+
build/scala-*
445+
build/*.jar
446+
~/.sbt
447+
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
448+
restore-keys: |
449+
build-
417450
- name: Cache Coursier local repository
418451
uses: actions/cache@v2
419452
with:
@@ -428,3 +461,62 @@ jobs:
428461
- name: Build with SBT
429462
run: |
430463
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Phadoop-2.7 compile test:compile
464+
465+
tpcds-1g:
466+
name: Run TPC-DS queries with SF=1
467+
runs-on: ubuntu-20.04
468+
steps:
469+
- name: Checkout Spark repository
470+
uses: actions/checkout@v2
471+
- name: Cache TPC-DS generated data
472+
id: cache-tpcds-sf-1
473+
uses: actions/cache@v2
474+
with:
475+
path: ./tpcds-sf-1
476+
key: tpcds-${{ hashFiles('tpcds-sf-1/.spark-tpcds-sf-1.md5') }}
477+
restore-keys: |
478+
tpcds-
479+
- name: Checkout TPC-DS (SF=1) generated data repository
480+
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
481+
uses: actions/checkout@v2
482+
with:
483+
repository: maropu/spark-tpcds-sf-1
484+
ref: 6b660a53091bd6d23cbe58b0f09aae08e71cc667
485+
path: ./tpcds-sf-1
486+
- name: Cache Scala, SBT and Maven
487+
uses: actions/cache@v2
488+
with:
489+
path: |
490+
build/apache-maven-*
491+
build/scala-*
492+
build/*.jar
493+
~/.sbt
494+
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
495+
restore-keys: |
496+
build-
497+
- name: Cache Coursier local repository
498+
uses: actions/cache@v2
499+
with:
500+
path: ~/.cache/coursier
501+
key: tpcds-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
502+
restore-keys: |
503+
tpcds-coursier-
504+
- name: Install Java 8
505+
uses: actions/setup-java@v1
506+
with:
507+
java-version: 8
508+
- name: Run TPC-DS queries
509+
run: |
510+
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite"
511+
- name: Upload test results to report
512+
if: always()
513+
uses: actions/upload-artifact@v2
514+
with:
515+
name: test-results-tpcds--8-hadoop3.2-hive2.3
516+
path: "**/target/test-reports/*.xml"
517+
- name: Upload unit tests log files
518+
if: failure()
519+
uses: actions/upload-artifact@v2
520+
with:
521+
name: unit-tests-log-tpcds--8-hadoop3.2-hive2.3
522+
path: "**/target/unit-tests.log"

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
165165
if (hasInFlightRequests) {
166166
String address = getRemoteAddress(ctx.channel());
167167
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
168-
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
169-
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
168+
"requests. Assuming connection is dead; please adjust" +
169+
" spark.{}.io.connectionTimeout if this is wrong.",
170+
address, requestTimeoutNs / 1000 / 1000, transportContext.getConf().getModuleName());
170171
client.timeOut();
171172
ctx.close();
172173
} else if (closeIdleConnections) {

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.IOException;
22+
import java.net.URL;
2223
import java.nio.charset.StandardCharsets;
2324
import java.nio.ByteBuffer;
2425
import java.util.List;
@@ -75,6 +76,20 @@
7576
* is because an application running on the same Yarn cluster may choose to not use the external
7677
* shuffle service, in which case its setting of `spark.authenticate` should be independent of
7778
* the service's.
79+
*
80+
* The shuffle service will produce metrics via the YARN NodeManager's {@code metrics2} system
81+
* under a namespace specified by the {@value SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY} config.
82+
*
83+
* By default, all configurations for the shuffle service will be taken directly from the
84+
* Hadoop {@link Configuration} passed by the YARN NodeManager. It is also possible to configure
85+
* the shuffle service by placing a resource named
86+
* {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, which should be an
87+
* XML file in the standard Hadoop Configuration resource format. Note that when the shuffle
88+
* service is loaded in the default manner, without configuring
89+
* {@code yarn.nodemanager.aux-services.<service>.classpath}, this file must be on the classpath
90+
* of the NodeManager itself. When using the {@code classpath} configuration, it can be present
91+
* either on the NodeManager's classpath, or specified in the classpath configuration.
92+
* This {@code classpath} configuration is only supported on YARN versions >= 2.9.0.
7893
*/
7994
public class YarnShuffleService extends AuxiliaryService {
8095
private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
@@ -83,6 +98,14 @@ public class YarnShuffleService extends AuxiliaryService {
8398
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
8499
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
85100

101+
/**
102+
* The namespace to use for the metrics record which will contain all metrics produced by the
103+
* shuffle service.
104+
*/
105+
static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
106+
"spark.yarn.shuffle.service.metrics.namespace";
107+
private static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService";
108+
86109
// Whether the shuffle server should authenticate fetch requests
87110
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
88111
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
@@ -103,6 +126,13 @@ public class YarnShuffleService extends AuxiliaryService {
103126
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
104127
.StoreVersion(1, 0);
105128

129+
/**
130+
* The name of the resource to search for on the classpath to find a shuffle service-specific
131+
* configuration overlay. If found, this will be parsed as a standard Hadoop
132+
* {@link Configuration config} file and will override the configs passed from the NodeManager.
133+
*/
134+
static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = "spark-shuffle-site.xml";
135+
106136
// just for integration tests that want to look at this file -- in general not sensible as
107137
// a static
108138
@VisibleForTesting
@@ -139,6 +169,13 @@ public class YarnShuffleService extends AuxiliaryService {
139169
private DB db;
140170

141171
public YarnShuffleService() {
172+
// The name of the auxiliary service configured within the NodeManager
173+
// (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so this one can be
174+
// arbitrary. The NodeManager will log a warning if the configured name doesn't match this name,
175+
// to inform operators of a potential misconfiguration, but this name is otherwise not used.
176+
// It is hard-coded instead of using the value of the `spark.shuffle.service.name` configuration
177+
// because at this point in instantiation there is no Configuration object; it is not passed
178+
// until `serviceInit` is called, at which point it's too late to adjust the name.
142179
super("spark_shuffle");
143180
logger.info("Initializing YARN shuffle service for Spark");
144181
instance = this;
@@ -157,10 +194,18 @@ private boolean isAuthenticationEnabled() {
157194
* Start the shuffle server with the given configuration.
158195
*/
159196
@Override
160-
protected void serviceInit(Configuration conf) throws Exception {
161-
_conf = conf;
197+
protected void serviceInit(Configuration externalConf) throws Exception {
198+
_conf = new Configuration(externalConf);
199+
URL confOverlayUrl = Thread.currentThread().getContextClassLoader()
200+
.getResource(SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME);
201+
if (confOverlayUrl != null) {
202+
logger.info("Initializing Spark YARN shuffle service with configuration overlay from {}",
203+
confOverlayUrl);
204+
_conf.addResource(confOverlayUrl);
205+
}
206+
super.serviceInit(_conf);
162207

163-
boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
208+
boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
164209

165210
try {
166211
// In case this NM was killed while there were running spark applications, we need to restore
@@ -172,7 +217,7 @@ protected void serviceInit(Configuration conf) throws Exception {
172217
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
173218
}
174219

175-
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
220+
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf));
176221
MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance(
177222
transportConf);
178223
blockHandler = new ExternalBlockHandler(
@@ -181,7 +226,7 @@ protected void serviceInit(Configuration conf) throws Exception {
181226
// If authentication is enabled, set up the shuffle server to use a
182227
// special RPC handler that filters out unauthenticated fetch requests
183228
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
184-
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
229+
boolean authEnabled = _conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
185230
if (authEnabled) {
186231
secretManager = new ShuffleSecretManager();
187232
if (_recoveryPath != null) {
@@ -190,7 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception {
190235
bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
191236
}
192237

193-
int port = conf.getInt(
238+
int port = _conf.getInt(
194239
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
195240
transportContext = new TransportContext(transportConf, blockHandler, true);
196241
shuffleServer = transportContext.createServer(port, bootstraps);
@@ -203,13 +248,16 @@ protected void serviceInit(Configuration conf) throws Exception {
203248
blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
204249
shuffleServer.getRegisteredConnections());
205250
blockHandler.getAllMetrics().getMetrics().putAll(shuffleServer.getAllMetrics().getMetrics());
251+
String metricsNamespace = _conf.get(SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY,
252+
DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
206253
YarnShuffleServiceMetrics serviceMetrics =
207-
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
254+
new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics());
208255

209256
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
210257
metricsSystem.register(
211-
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
212-
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
258+
metricsNamespace, "Metrics on the Spark Shuffle Service", serviceMetrics);
259+
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using namespace '{}'",
260+
metricsNamespace);
213261

214262
logger.info("Started YARN shuffle service for Spark on port {}. " +
215263
"Authentication is {}. Registered executor file is {}", port, authEnabledString,

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
*/
3333
class YarnShuffleServiceMetrics implements MetricsSource {
3434

35+
private final String metricsNamespace;
3536
private final MetricSet metricSet;
3637

37-
YarnShuffleServiceMetrics(MetricSet metricSet) {
38+
YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) {
39+
this.metricsNamespace = metricsNamespace;
3840
this.metricSet = metricSet;
3941
}
4042

@@ -46,7 +48,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
4648
*/
4749
@Override
4850
public void getMetrics(MetricsCollector collector, boolean all) {
49-
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
51+
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord(metricsNamespace);
5052

5153
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
5254
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());

core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ private[spark] class ExecutorMetricsPoller(
5353

5454
type StageKey = (Int, Int)
5555
// Task Count and Metric Peaks
56-
private case class TCMP(count: AtomicLong, peaks: AtomicLongArray)
56+
private[executor] case class TCMP(count: AtomicLong, peaks: AtomicLongArray)
5757

5858
// Map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks)
59-
private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP]
59+
private[executor] val stageTCMP = new ConcurrentHashMap[StageKey, TCMP]
6060

6161
// Map of taskId to executor metric peaks
6262
private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray]
@@ -124,17 +124,12 @@ private[spark] class ExecutorMetricsPoller(
124124
*/
125125
def onTaskCompletion(taskId: Long, stageId: Int, stageAttemptId: Int): Unit = {
126126
// Decrement the task count.
127-
// Remove the entry from stageTCMP if the task count reaches zero.
128127

129128
def decrementCount(stage: StageKey, countAndPeaks: TCMP): TCMP = {
130129
val countValue = countAndPeaks.count.decrementAndGet()
131-
if (countValue == 0L) {
132-
logDebug(s"removing (${stage._1}, ${stage._2}) from stageTCMP")
133-
null
134-
} else {
135-
logDebug(s"stageTCMP: (${stage._1}, ${stage._2}) -> " + countValue)
136-
countAndPeaks
137-
}
130+
assert(countValue >= 0, "task count shouldn't below 0")
131+
logDebug(s"stageTCMP: (${stage._1}, ${stage._2}) -> " + countValue)
132+
countAndPeaks
138133
}
139134

140135
stageTCMP.computeIfPresent((stageId, stageAttemptId), decrementCount)
@@ -176,6 +171,20 @@ private[spark] class ExecutorMetricsPoller(
176171

177172
stageTCMP.replaceAll(getUpdateAndResetPeaks)
178173

174+
def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
175+
if (v.count.get == 0) {
176+
logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
177+
null
178+
} else {
179+
v
180+
}
181+
}
182+
183+
// Remove the entry from stageTCMP if the task count reaches zero.
184+
executorUpdates.foreach { case (k, _) =>
185+
stageTCMP.computeIfPresent(k, removeIfInactive)
186+
}
187+
179188
executorUpdates
180189
}
181190

0 commit comments

Comments
 (0)