Skip to content

Commit af2ec3c

Browse files
committed
Merge remote-tracking branch 'origin/master' into partition-spec-value-null
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowPartitionsExec.scala # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
2 parents 343d15d + 0617dfc commit af2ec3c

File tree

185 files changed

+5287
-1668
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

185 files changed

+5287
-1668
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,4 +398,14 @@ public long mergedIndexCacheSize() {
398398
return JavaUtils.byteStringAsBytes(
399399
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
400400
}
401+
402+
/**
403+
* The threshold for number of IOExceptions while merging shuffle blocks to a shuffle partition.
404+
* When the number of IOExceptions while writing to merged shuffle data/index/meta file exceed
405+
* this threshold then the shuffle server will respond back to client to stop pushing shuffle
406+
* blocks for this shuffle partition.
407+
*/
408+
public int ioExceptionsThresholdDuringMerge() {
409+
return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
410+
}
401411
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ class BlockPushErrorHandler implements ErrorHandler {
7171
public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
7272
"Couldn't find an opportunity to write block";
7373

74+
/**
75+
* String constant used for generating exception messages indicating the server encountered
76+
* IOExceptions multiple times, greater than the configured threshold, while trying to merged
77+
* shuffle blocks of the same shuffle partition. When the client receives this this response,
78+
* it will stop pushing any more blocks for the same shuffle partition.
79+
*/
80+
public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
81+
"IOExceptions exceeded the threshold";
82+
7483
@Override
7584
public boolean shouldRetryError(Throwable t) {
7685
// If it is a connection time out or a connection closed exception, no need to retry.

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java

Lines changed: 230 additions & 71 deletions
Large diffs are not rendered by default.

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java

Lines changed: 380 additions & 0 deletions
Large diffs are not rendered by default.

core/src/main/resources/org/apache/spark/ui/static/stagepage.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,8 @@ $(document).ready(function () {
946946
},
947947
{
948948
data : function (row, type) {
949-
if (row.taskMetrics && row.taskMetrics.shuffleReadMetrics && row.taskMetrics.shuffleReadMetrics.localBytesRead > 0) {
949+
if (row.taskMetrics && row.taskMetrics.shuffleReadMetrics &&
950+
(row.taskMetrics.shuffleReadMetrics.localBytesRead > 0 || row.taskMetrics.shuffleReadMetrics.remoteBytesRead > 0)) {
950951
var totalBytesRead = parseInt(row.taskMetrics.shuffleReadMetrics.localBytesRead) + parseInt(row.taskMetrics.shuffleReadMetrics.remoteBytesRead);
951952
if (type === 'display') {
952953
return formatBytes(totalBytesRead, type) + " / " + row.taskMetrics.shuffleReadMetrics.recordsRead;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* A TaskContext aware iterator.
25+
*
26+
* As the Python evaluation consumes the parent iterator in a separate thread,
27+
* it could consume more data from the parent even after the task ends and the parent is closed.
28+
* If an off-heap access exists in the parent iterator, it could cause segmentation fault
29+
* which crashes the executor.
30+
* Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
31+
*/
32+
@DeveloperApi
33+
class ContextAwareIterator[+T](val context: TaskContext, val delegate: Iterator[T])
34+
extends Iterator[T] {
35+
36+
override def hasNext: Boolean =
37+
!context.isCompleted() && !context.isInterrupted() && delegate.hasNext
38+
39+
override def next(): T = delegate.next()
40+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,7 +1929,7 @@ class SparkContext(config: SparkConf) extends Logging {
19291929
}
19301930

19311931
private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
1932-
def addLocalJarFile(file: File): String = {
1932+
def addLocalJarFile(file: File): Seq[String] = {
19331933
try {
19341934
if (!file.exists()) {
19351935
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
@@ -1938,15 +1938,15 @@ class SparkContext(config: SparkConf) extends Logging {
19381938
throw new IllegalArgumentException(
19391939
s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
19401940
}
1941-
env.rpcEnv.fileServer.addJar(file)
1941+
Seq(env.rpcEnv.fileServer.addJar(file))
19421942
} catch {
19431943
case NonFatal(e) =>
19441944
logError(s"Failed to add $path to Spark environment", e)
1945-
null
1945+
Nil
19461946
}
19471947
}
19481948

1949-
def checkRemoteJarFile(path: String): String = {
1949+
def checkRemoteJarFile(path: String): Seq[String] = {
19501950
val hadoopPath = new Path(path)
19511951
val scheme = hadoopPath.toUri.getScheme
19521952
if (!Array("http", "https", "ftp").contains(scheme)) {
@@ -1959,47 +1959,58 @@ class SparkContext(config: SparkConf) extends Logging {
19591959
throw new IllegalArgumentException(
19601960
s"Directory ${path} is not allowed for addJar")
19611961
}
1962-
path
1962+
Seq(path)
19631963
} catch {
19641964
case NonFatal(e) =>
19651965
logError(s"Failed to add $path to Spark environment", e)
1966-
null
1966+
Nil
19671967
}
19681968
} else {
1969-
path
1969+
Seq(path)
19701970
}
19711971
}
19721972

19731973
if (path == null || path.isEmpty) {
19741974
logWarning("null or empty path specified as parameter to addJar")
19751975
} else {
1976-
val key = if (path.contains("\\") && Utils.isWindows) {
1976+
val (keys, scheme) = if (path.contains("\\") && Utils.isWindows) {
19771977
// For local paths with backslashes on Windows, URI throws an exception
1978-
addLocalJarFile(new File(path))
1978+
(addLocalJarFile(new File(path)), "local")
19791979
} else {
19801980
val uri = new Path(path).toUri
19811981
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
19821982
Utils.validateURL(uri)
1983-
uri.getScheme match {
1983+
val uriScheme = uri.getScheme
1984+
val jarPaths = uriScheme match {
19841985
// A JAR file which exists only on the driver node
19851986
case null =>
19861987
// SPARK-22585 path without schema is not url encoded
19871988
addLocalJarFile(new File(uri.getPath))
19881989
// A JAR file which exists only on the driver node
19891990
case "file" => addLocalJarFile(new File(uri.getPath))
19901991
// A JAR file which exists locally on every worker node
1991-
case "local" => "file:" + uri.getPath
1992+
case "local" => Seq("file:" + uri.getPath)
1993+
case "ivy" =>
1994+
// Since `new Path(path).toUri` will lose query information,
1995+
// so here we use `URI.create(path)`
1996+
DependencyUtils.resolveMavenDependencies(URI.create(path))
1997+
.flatMap(jar => addLocalJarFile(new File(jar)))
19921998
case _ => checkRemoteJarFile(path)
19931999
}
2000+
(jarPaths, uriScheme)
19942001
}
1995-
if (key != null) {
2002+
if (keys.nonEmpty) {
19962003
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
1997-
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
1998-
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
2004+
val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty)
2005+
if (added.nonEmpty) {
2006+
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
2007+
logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp")
19992008
postEnvironmentUpdate()
2000-
} else {
2001-
logWarning(s"The jar $path has been added already. Overwriting of added jars " +
2002-
"is not supported in the current version.")
2009+
}
2010+
if (existed.nonEmpty) {
2011+
val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI"
2012+
logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." +
2013+
" Overwriting of added jar is not supported in the current version.")
20032014
}
20042015
}
20052016
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -304,28 +304,29 @@ private[spark] class SparkSubmit extends Logging {
304304
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
305305
// too for packages that include Python code
306306
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
307-
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
308-
args.ivySettingsPath)
307+
packagesTransitive = true, args.packagesExclusions, args.packages,
308+
args.repositories, args.ivyRepoPath, args.ivySettingsPath)
309309

310-
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
310+
if (resolvedMavenCoordinates.nonEmpty) {
311311
// In K8s client mode, when in the driver, add resolved jars early as we might need
312312
// them at the submit time for artifact downloading.
313313
// For example we might use the dependencies for downloading
314314
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
315315
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
316316
if (isKubernetesClusterModeDriver) {
317317
val loader = getSubmitClassLoader(sparkConf)
318-
for (jar <- resolvedMavenCoordinates.split(",")) {
318+
for (jar <- resolvedMavenCoordinates) {
319319
addJarToClasspath(jar, loader)
320320
}
321321
} else if (isKubernetesCluster) {
322322
// We need this in K8s cluster mode so that we can upload local deps
323323
// via the k8s application, like in cluster mode driver
324-
childClasspath ++= resolvedMavenCoordinates.split(",")
324+
childClasspath ++= resolvedMavenCoordinates
325325
} else {
326-
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
326+
args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
327327
if (args.isPython || isInternal(args.primaryResource)) {
328-
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
328+
args.pyFiles = mergeFileLists(args.pyFiles,
329+
mergeFileLists(resolvedMavenCoordinates: _*))
329330
}
330331
}
331332
}
@@ -1201,7 +1202,7 @@ private[spark] object SparkSubmitUtils {
12011202
*/
12021203
def resolveDependencyPaths(
12031204
artifacts: Array[AnyRef],
1204-
cacheDirectory: File): String = {
1205+
cacheDirectory: File): Seq[String] = {
12051206
artifacts.map { artifactInfo =>
12061207
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
12071208
val extraAttrs = artifactInfo.asInstanceOf[Artifact].getExtraAttributes
@@ -1212,7 +1213,7 @@ private[spark] object SparkSubmitUtils {
12121213
}
12131214
cacheDirectory.getAbsolutePath + File.separator +
12141215
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
1215-
}.mkString(",")
1216+
}
12161217
}
12171218

12181219
/** Adds the given maven coordinates to Ivy's module descriptor. */
@@ -1360,17 +1361,19 @@ private[spark] object SparkSubmitUtils {
13601361
* Resolves any dependencies that were supplied through maven coordinates
13611362
* @param coordinates Comma-delimited string of maven coordinates
13621363
* @param ivySettings An IvySettings containing resolvers to use
1364+
* @param transitive Whether resolving transitive dependencies, default is true
13631365
* @param exclusions Exclusions to apply when resolving transitive dependencies
1364-
* @return The comma-delimited path to the jars of the given maven artifacts including their
1366+
* @return Seq of path to the jars of the given maven artifacts including their
13651367
* transitive dependencies
13661368
*/
13671369
def resolveMavenCoordinates(
13681370
coordinates: String,
13691371
ivySettings: IvySettings,
1372+
transitive: Boolean,
13701373
exclusions: Seq[String] = Nil,
1371-
isTest: Boolean = false): String = {
1374+
isTest: Boolean = false): Seq[String] = {
13721375
if (coordinates == null || coordinates.trim.isEmpty) {
1373-
""
1376+
Nil
13741377
} else {
13751378
val sysOut = System.out
13761379
// Default configuration name for ivy
@@ -1396,7 +1399,7 @@ private[spark] object SparkSubmitUtils {
13961399
val ivy = Ivy.newInstance(ivySettings)
13971400
// Set resolve options to download transitive dependencies as well
13981401
val resolveOptions = new ResolveOptions
1399-
resolveOptions.setTransitive(true)
1402+
resolveOptions.setTransitive(transitive)
14001403
val retrieveOptions = new RetrieveOptions
14011404
// Turn downloading and logging off for testing
14021405
if (isTest) {

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package org.apache.spark.deploy.worker
1919

2020
import java.io.File
2121

22-
import org.apache.commons.lang3.StringUtils
23-
2422
import org.apache.spark.{SecurityManager, SparkConf}
25-
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil}
23+
import org.apache.spark.deploy.SparkHadoopUtil
2624
import org.apache.spark.internal.{config, Logging}
2725
import org.apache.spark.rpc.RpcEnv
2826
import org.apache.spark.util._
@@ -79,21 +77,16 @@ object DriverWrapper extends Logging {
7977
val secMgr = new SecurityManager(sparkConf)
8078
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
8179

82-
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) =
83-
Seq(
84-
"spark.jars.excludes",
85-
"spark.jars.packages",
86-
"spark.jars.repositories",
87-
"spark.jars.ivy",
88-
"spark.jars.ivySettings"
89-
).map(sys.props.get(_).orNull)
80+
val ivyProperties = DependencyUtils.getIvyProperties()
9081

91-
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
92-
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
82+
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(true,
83+
ivyProperties.packagesExclusions, ivyProperties.packages, ivyProperties.repositories,
84+
ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath))
9385
val jars = {
9486
val jarsProp = sys.props.get(config.JARS.key).orNull
95-
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
96-
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
87+
if (resolvedMavenCoordinates.nonEmpty) {
88+
DependencyUtils.mergeFileLists(jarsProp,
89+
DependencyUtils.mergeFileLists(resolvedMavenCoordinates: _*))
9790
} else {
9891
jarsProp
9992
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap
2424

2525
import org.apache.spark.{JobExecutionStatus, SparkConf}
2626
import org.apache.spark.status.api.v1
27+
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
2728
import org.apache.spark.ui.scope._
2829
import org.apache.spark.util.Utils
2930
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
@@ -88,7 +89,7 @@ private[spark] class AppStatusStore(
8889
} else {
8990
base
9091
}
91-
filtered.asScala.map(_.info).toSeq
92+
filtered.asScala.map(_.info).filter(_.id != FALLBACK_BLOCK_MANAGER_ID.executorId).toSeq
9293
}
9394

9495
def executorSummary(executorId: String): v1.ExecutorSummary = {

0 commit comments

Comments
 (0)