Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,8 @@ private[spark] object SparkConf extends Logging {
"Please use the new excludedOnFailure options, spark.excludeOnFailure.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.rpc.numRetries", "2.2.0", "Not used anymore"),
DeprecatedConfig("spark.rpc.retry.wait", "2.2.0", "Not used anymore"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
"Not used anymore. Please use spark.shuffle.service.index.cache.size"),
DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."),
Expand Down Expand Up @@ -682,10 +684,6 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq(
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
RPC_NUM_RETRIES.key -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
RPC_RETRY_WAIT.key -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
RPC_ASK_TIMEOUT.key -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
RPC_LOOKUP_TIMEOUT.key -> Seq(
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/scala/org/apache/spark/internal/config/Network.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,4 @@ private[spark] object Network {
.version("1.6.0")
.intConf
.createOptional

private[spark] val RPC_NUM_RETRIES =
ConfigBuilder("spark.rpc.numRetries")
.version("1.4.0")
.intConf
.createWithDefault(3)

private[spark] val RPC_RETRY_WAIT =
ConfigBuilder("spark.rpc.retry.wait")
.version("1.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
}
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import org.apache.spark.util.RpcUtils
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

/**
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ private[spark] object RpcUtils {
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.get(RPC_NUM_RETRIES)
}

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Long = {
conf.get(RPC_RETRY_WAIT)
}

/** Returns the default Spark timeout to use for RPC ask operations. */
def askRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
Expand Down
8 changes: 0 additions & 8 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("akka deprecated configs") {
val conf = new SparkConf()

assert(!conf.contains(RPC_NUM_RETRIES))
assert(!conf.contains(RPC_RETRY_WAIT))
assert(!conf.contains(RPC_ASK_TIMEOUT))
assert(!conf.contains(RPC_LOOKUP_TIMEOUT))

conf.set("spark.akka.num.retries", "1")
assert(RpcUtils.numRetries(conf) === 1)

conf.set("spark.akka.retry.wait", "2")
assert(RpcUtils.retryWaitMs(conf) === 2L)

conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)

Expand Down
4 changes: 0 additions & 4 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

val conf = new SparkConf()
val shortProp = "spark.rpc.short.timeout"
conf.set(Network.RPC_RETRY_WAIT, 0L)
conf.set(Network.RPC_NUM_RETRIES, 1)
val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
Expand Down Expand Up @@ -203,8 +201,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

val conf = new SparkConf()
val shortProp = "spark.rpc.short.timeout"
conf.set(Network.RPC_RETRY_WAIT, 0L)
conf.set(Network.RPC_NUM_RETRIES, 1)
val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-abort")
Expand Down
17 changes: 0 additions & 17 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2090,23 +2090,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>1.1.1</td>
</tr>
<tr>
<td><code>spark.rpc.numRetries</code></td>
<td>3</td>
<td>
Number of times to retry before an RPC task gives up.
An RPC task will run at most times of this number.
</td>
<td>1.4.0</td>
</tr>
<tr>
<td><code>spark.rpc.retry.wait</code></td>
<td>3s</td>
<td>
Duration for an RPC ask operation to wait before retrying.
</td>
<td>1.4.0</td>
</tr>
<tr>
<td><code>spark.rpc.askTimeout</code></td>
<td><code>spark.network.timeout</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.internal.config.Network.RPC_NUM_RETRIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.quietly
Expand Down Expand Up @@ -250,9 +249,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
// Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
// fails to talk to the StateStoreCoordinator and unloads all the StateStores
.set(RPC_NUM_RETRIES, 1)
val opId = 0
val dir1 = newDir()
val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0), UUID.randomUUID)
Expand Down Expand Up @@ -1178,7 +1174,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(metricNew.desc === "new desc", "incorrect description in copied instance")
assert(metricNew.name === "m1", "incorrect name in copied instance")

val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763").set(RPC_NUM_RETRIES, 1)
val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763")
withSpark(new SparkContext(conf)) { sc =>
val sqlMetric = metric.createSQLMetric(sc)
assert(sqlMetric != null)
Expand Down