diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index cf121749b734..00a0f61ab47b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -606,6 +606,8 @@ private[spark] object SparkConf extends Logging { "Please use the new excludedOnFailure options, spark.excludeOnFailure.*"), DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"), DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"), + DeprecatedConfig("spark.rpc.numRetries", "2.2.0", "Not used anymore"), + DeprecatedConfig("spark.rpc.retry.wait", "2.2.0", "Not used anymore"), DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", "Not used anymore. Please use spark.shuffle.service.index.cache.size"), DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."), @@ -682,10 +684,6 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.io.compression.snappy.block.size", "1.4")), IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq( AlternateConfig("spark.io.compression.lz4.block.size", "1.4")), - RPC_NUM_RETRIES.key -> Seq( - AlternateConfig("spark.akka.num.retries", "1.4")), - RPC_RETRY_WAIT.key -> Seq( - AlternateConfig("spark.akka.retry.wait", "1.4")), RPC_ASK_TIMEOUT.key -> Seq( AlternateConfig("spark.akka.askTimeout", "1.4")), RPC_LOOKUP_TIMEOUT.key -> Seq( diff --git a/core/src/main/scala/org/apache/spark/internal/config/Network.scala b/core/src/main/scala/org/apache/spark/internal/config/Network.scala index 0961d062cc04..568394208390 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Network.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Network.scala @@ -92,16 +92,4 @@ private[spark] object Network { .version("1.6.0") .intConf .createOptional - - private[spark] val RPC_NUM_RETRIES = - ConfigBuilder("spark.rpc.numRetries") - .version("1.4.0") - .intConf - .createWithDefault(3) - - private[spark] val RPC_RETRY_WAIT = - ConfigBuilder("spark.rpc.retry.wait") - .version("1.4.0") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("3s") } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index a3d27b0d0992..925dcdba0732 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -30,8 +30,6 @@ import org.apache.spark.util.RpcUtils private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging { - private[this] val maxRetries = RpcUtils.numRetries(conf) - private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) /** diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 0e4debc59534..30f5fced5a8b 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -36,16 +36,6 @@ private[spark] object RpcUtils { rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name) } - /** Returns the configured number of times to retry connecting */ - def numRetries(conf: SparkConf): Int = { - conf.get(RPC_NUM_RETRIES) - } - - /** Returns the configured number of milliseconds to wait on each retry */ - def retryWaitMs(conf: SparkConf): Long = { - conf.get(RPC_RETRY_WAIT) - } - /** Returns the default Spark timeout to use for RPC ask operations. */ def askRpcTimeout(conf: SparkConf): RpcTimeout = { RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 7779fb2aeaf0..7ae1eac1db18 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -284,17 +284,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("akka deprecated configs") { val conf = new SparkConf() - assert(!conf.contains(RPC_NUM_RETRIES)) - assert(!conf.contains(RPC_RETRY_WAIT)) assert(!conf.contains(RPC_ASK_TIMEOUT)) assert(!conf.contains(RPC_LOOKUP_TIMEOUT)) - conf.set("spark.akka.num.retries", "1") - assert(RpcUtils.numRetries(conf) === 1) - - conf.set("spark.akka.retry.wait", "2") - assert(RpcUtils.retryWaitMs(conf) === 2L) - conf.set("spark.akka.askTimeout", "3") assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index acd6049a0c6b..1681bf9b57b3 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -171,8 +171,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val conf = new SparkConf() val shortProp = "spark.rpc.short.timeout" - conf.set(Network.RPC_RETRY_WAIT, 0L) - conf.set(Network.RPC_NUM_RETRIES, 1) val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true) // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout") @@ -203,8 +201,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val conf = new SparkConf() val shortProp = "spark.rpc.short.timeout" - conf.set(Network.RPC_RETRY_WAIT, 0L) - conf.set(Network.RPC_NUM_RETRIES, 1) val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true) // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-abort") diff --git a/docs/configuration.md b/docs/configuration.md index 3374be026069..e01558a309ab 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2090,23 +2090,6 @@ Apart from these, the following properties are also available, and may be useful 1.1.1 - - spark.rpc.numRetries - 3 - - Number of times to retry before an RPC task gives up. - An RPC task will run at most times of this number. - - 1.4.0 - - - spark.rpc.retry.wait - 3s - - Duration for an RPC ask operation to wait before retrying. - - 1.4.0 - spark.rpc.askTimeout spark.network.timeout diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index dde925bb2d96..f74a7a0aabb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -35,7 +35,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ -import org.apache.spark.internal.config.Network.RPC_NUM_RETRIES import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly @@ -250,9 +249,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val conf = new SparkConf() .setMaster("local") .setAppName("test") - // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly' - // fails to talk to the StateStoreCoordinator and unloads all the StateStores - .set(RPC_NUM_RETRIES, 1) val opId = 0 val dir1 = newDir() val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0), UUID.randomUUID) @@ -1178,7 +1174,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] assert(metricNew.desc === "new desc", "incorrect description in copied instance") assert(metricNew.name === "m1", "incorrect name in copied instance") - val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763").set(RPC_NUM_RETRIES, 1) + val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763") withSpark(new SparkContext(conf)) { sc => val sqlMetric = metric.createSQLMetric(sc) assert(sqlMetric != null)