diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 8e7ecf500ed5..e8d3c5af9a1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -381,6 +381,8 @@ public boolean useOldFetchProtocol() { * 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'. * To turn on push-based shuffle at a cluster level, set the configuration to * 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'. + * + * Push-based shuffle is not yet supported. */ public String mergedShuffleFileManagerImpl() { return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl", diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cb6d5d0ca203..5f8ba55bd67c 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -220,6 +220,9 @@ protected void serviceInit(Configuration externalConf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf)); MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance( transportConf); + if (!(shuffleMergeManager instanceof ExternalBlockHandler.NoOpMergedShuffleFileManager)) { + throw new UnsupportedOperationException("Push-based shuffle is not yet supported."); + } blockHandler = new ExternalBlockHandler( transportConf, registeredExecutorFile, shuffleMergeManager); diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 39c526cb0e8b..149fcfc48a99 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2098,7 +2098,7 @@ package object config { "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " + "which needs to be set with the appropriate " + "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " + - "shuffle to be enabled") + "shuffle to be enabled. Push-based shuffle is not yet supported.") .version("3.1.0") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 89aa29925059..ffca4b3b5280 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2595,10 +2595,14 @@ private[spark] object Utils extends Logging { * to run in YARN mode, with external shuffle service enabled */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { - conf.get(PUSH_BASED_SHUFFLE_ENABLED) && + val isPushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED) && (conf.get(IS_TESTING).getOrElse(false) || (conf.get(SHUFFLE_SERVICE_ENABLED) && conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")) + if (isPushBasedShuffleEnabled && !conf.get(IS_TESTING).getOrElse(false)) { + throw new UnsupportedOperationException("Push-based shuffle is not yet supported.") + } + isPushBasedShuffleEnabled } /** diff --git a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala index 33f544a3911c..661b8a52b389 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.matchers.should.Matchers._ import org.apache.spark._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.TransportServer @@ -136,6 +137,7 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite with Matchers with Loca test("Enable host local shuffle reading when push based shuffle is enabled") { val conf = new SparkConf() + .set(IS_TESTING, true) .set(SHUFFLE_SERVICE_ENABLED, true) .set("spark.yarn.maxAttempts", "1") .set(PUSH_BASED_SHUFFLE_ENABLED, true) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 095dbefdb24a..477e0189ff4b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1448,7 +1448,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set(SHUFFLE_SERVICE_ENABLED, true) conf.set(SparkLauncher.SPARK_MASTER, "yarn") conf.set("spark.yarn.maxAttempts", "1") - assert(Utils.isPushBasedShuffleEnabled(conf) === true) + assertThrows[UnsupportedOperationException](Utils.isPushBasedShuffleEnabled(conf)) conf.set("spark.yarn.maxAttempts", "2") assert(Utils.isPushBasedShuffleEnabled(conf) === true) }