From 79eb226a1c5cc260922361f762636f188831fe8e Mon Sep 17 00:00:00 2001 From: Xianming Lei Date: Wed, 11 Jun 2025 17:05:30 +0800 Subject: [PATCH 1/3] [CELEBORN-1719][FOLLOWUP] Rename throwsFetchFailure to stageRerunEnabled --- .../Celeborn-Optimize-Skew-Partitions-spark3_2.patch | 4 ++-- .../Celeborn-Optimize-Skew-Partitions-spark3_3.patch | 4 ++-- .../Celeborn-Optimize-Skew-Partitions-spark3_4.patch | 4 ++-- .../Celeborn-Optimize-Skew-Partitions-spark3_5.patch | 4 ++-- .../spark/shuffle/celeborn/SparkShuffleManager.java | 2 +- .../org/apache/spark/shuffle/celeborn/SparkUtils.java | 2 +- .../spark/shuffle/celeborn/CelebornShuffleHandle.scala | 6 +++--- .../spark/shuffle/celeborn/CelebornShuffleReader.scala | 6 +++--- .../spark/shuffle/celeborn/SparkShuffleManager.java | 2 +- .../org/apache/spark/shuffle/celeborn/SparkUtils.java | 2 +- .../spark/shuffle/celeborn/CelebornShuffleHandle.scala | 6 +++--- .../spark/shuffle/celeborn/CelebornShuffleReader.scala | 8 ++++---- .../apache/celeborn/client/read/CelebornInputStream.java | 2 +- docs/migration.md | 2 ++ .../celeborn/tests/spark/CelebornFetchFailureSuite.scala | 6 +++--- 15 files changed, 31 insertions(+), 29 deletions(-) diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch index 509105cf1e2..f02902d6488 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch @@ -345,8 +345,8 @@ index 3609548f374..d34f43bf064 100644 + val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = + CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled -+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { ++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled ++ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId) + } diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch index 44c3f8a9707..2b0aecf0b16 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_3.patch @@ -349,8 +349,8 @@ index af689db3379..39d0b3132ee 100644 + val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = + CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled -+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { ++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled ++ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId) + } diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch index 27f7f418806..03b26e07a6b 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_4.patch @@ -349,8 +349,8 @@ index dbed66683b0..d656c8af6b7 100644 + val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = + CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled -+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { ++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled ++ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId) + } diff --git a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch index d49a6c2c4c4..b9fc88b795e 100644 --- a/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch +++ b/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5.patch @@ -361,8 +361,8 @@ index 9370b3d8d1d..d36e26a1376 100644 + val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled = + CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle + -+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled -+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { ++ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled ++ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) { + logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed") + CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId) + } diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 97c393f3b5d..563f00b9110 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -209,7 +209,7 @@ public ShuffleWriter getWriter( celebornConf, h.userIdentifier(), h.extension()); - if (h.throwsFetchFailure()) { + if (h.stageRerunEnabled()) { SparkUtils.addFailureListenerIfBarrierTask(client, context, h); } int shuffleId = SparkUtils.celebornShuffleId(client, h, context, true); diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index cff05deb268..9629fabfa17 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -160,7 +160,7 @@ public static int celebornShuffleId( CelebornShuffleHandle handle, TaskContext context, Boolean isWriter) { - if (handle.throwsFetchFailure()) { + if (handle.stageRerunEnabled()) { String appShuffleIdentifier = getAppShuffleIdentifier(handle.shuffleId(), context); Tuple2 res = client.getShuffleId( diff --git a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala index 4ae52720c42..6b925c21fab 100644 --- a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala +++ b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala @@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C]( val lifecycleManagerPort: Int, val userIdentifier: UserIdentifier, shuffleId: Int, - val throwsFetchFailure: Boolean, + val stageRerunEnabled: Boolean, numMappers: Int, dependency: ShuffleDependency[K, V, C], val extension: Array[Byte]) @@ -39,7 +39,7 @@ class CelebornShuffleHandle[K, V, C]( lifecycleManagerPort: Int, userIdentifier: UserIdentifier, shuffleId: Int, - throwsFetchFailure: Boolean, + stageRerunEnabled: Boolean, numMappers: Int, dependency: ShuffleDependency[K, V, C]) = this( appUniqueId, @@ -47,7 +47,7 @@ class CelebornShuffleHandle[K, V, C]( lifecycleManagerPort, userIdentifier, shuffleId, - throwsFetchFailure, + stageRerunEnabled, numMappers, dependency, null) diff --git a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index df63a94b15c..2372305c56e 100644 --- a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -69,7 +69,7 @@ class CelebornShuffleReader[K, C]( } catch { case e: CelebornRuntimeException => logError(s"Failed to get shuffleId for appShuffleId ${handle.shuffleId}", e) - if (handle.throwsFetchFailure) { + if (handle.stageRerunEnabled) { throw new FetchFailedException( null, handle.shuffleId, @@ -142,7 +142,7 @@ class CelebornShuffleReader[K, C]( if (exceptionRef.get() != null) { exceptionRef.get() match { case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - if (handle.throwsFetchFailure && + if (handle.stageRerunEnabled && shuffleClient.reportShuffleFetchFailure( handle.shuffleId, shuffleId, @@ -179,7 +179,7 @@ class CelebornShuffleReader[K, C]( iter } catch { case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - if (handle.throwsFetchFailure && + if (handle.stageRerunEnabled && shuffleClient.reportShuffleFetchFailure( handle.shuffleId, shuffleId, diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 151ce6e417a..b6555aa6135 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -288,7 +288,7 @@ public ShuffleWriter getWriter( celebornConf, h.userIdentifier(), h.extension()); - if (h.throwsFetchFailure()) { + if (h.stageRerunEnabled()) { SparkUtils.addFailureListenerIfBarrierTask(shuffleClient, context, h); } int shuffleId = SparkUtils.celebornShuffleId(shuffleClient, h, context, true); diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index 4a5bfdbb72f..c5dfb81b14f 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -133,7 +133,7 @@ public static int celebornShuffleId( CelebornShuffleHandle handle, TaskContext context, Boolean isWriter) { - if (handle.throwsFetchFailure()) { + if (handle.stageRerunEnabled()) { String appShuffleIdentifier = SparkCommonUtils.encodeAppShuffleIdentifier(handle.shuffleId(), context); Tuple2 res = diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala index 3d0180a267a..46f2ef8a472 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleHandle.scala @@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C]( val lifecycleManagerPort: Int, val userIdentifier: UserIdentifier, shuffleId: Int, - val throwsFetchFailure: Boolean, + val stageRerunEnabled: Boolean, val numMappers: Int, dependency: ShuffleDependency[K, V, C], val extension: Array[Byte]) @@ -39,7 +39,7 @@ class CelebornShuffleHandle[K, V, C]( lifecycleManagerPort: Int, userIdentifier: UserIdentifier, shuffleId: Int, - throwsFetchFailure: Boolean, + stageRerunEnabled: Boolean, numMappers: Int, dependency: ShuffleDependency[K, V, C]) = this( appUniqueId, @@ -47,7 +47,7 @@ class CelebornShuffleHandle[K, V, C]( lifecycleManagerPort, userIdentifier, shuffleId, - throwsFetchFailure, + stageRerunEnabled, numMappers, dependency, null) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index d202d377979..77257661705 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -94,7 +94,7 @@ class CelebornShuffleReader[K, C]( handle.extension) private val exceptionRef = new AtomicReference[IOException] - private val throwsFetchFailure = handle.throwsFetchFailure + private val stageRerunEnabled = handle.stageRerunEnabled private val encodedAttemptId = SparkCommonUtils.getEncodedAttemptNumber(context) override def read(): Iterator[Product2[K, C]] = { @@ -107,7 +107,7 @@ class CelebornShuffleReader[K, C]( } catch { case e: CelebornRuntimeException => logError(s"Failed to get shuffleId for appShuffleId ${handle.shuffleId}", e) - if (throwsFetchFailure) { + if (stageRerunEnabled) { throw new FetchFailedException( null, handle.shuffleId, @@ -329,7 +329,7 @@ class CelebornShuffleReader[K, C]( context.taskAttemptId(), startMapIndex, endMapIndex, - if (throwsFetchFailure) ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER + if (stageRerunEnabled) ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER else null, locationList, streamHandlers, @@ -513,7 +513,7 @@ class CelebornShuffleReader[K, C]( shuffleId: Int, partitionId: Int, ce: Throwable) = { - if (throwsFetchFailure && + if (stageRerunEnabled && shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, context.taskAttemptId())) { logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", ce) throw new FetchFailedException( diff --git a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java index 5b6f8ef7016..ad91fc38165 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java +++ b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java @@ -853,7 +853,7 @@ private boolean fillBuffer() throws IOException { if (exceptionMaker != null) { if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, taskId)) { /* - * [[ExceptionMaker.makeException]], for spark applications with celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating + * [[ExceptionMaker.makeException]], for spark applications with celeborn.client.spark.stageRerun.enabled enabled will result in creating * a FetchFailedException; and that will make the TaskContext as failed with shuffle fetch issues - see SPARK-19276 for more. * Given this, Celeborn can wrap the FetchFailedException with our CelebornIOException */ diff --git a/docs/migration.md b/docs/migration.md index b3105e3e1e3..a88904701cd 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -23,6 +23,8 @@ license: | # Upgrading from 0.5 to 0.6 +- Since 0.6.0, Celeborn deprecate `celeborn.client.spark.fetch.throwsFetchFailure`. Please use `celeborn.client.spark.stageRerun.enabled` instead. +- - Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to `celeborn.quota.user.diskBytesWritten`. Please use `celeborn.quota.user.diskBytesWritten` if you want to set user level quota. - Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to `celeborn.quota.user.diskFileCount`. Please use `celeborn.quota.user.diskFileCount` if you want to set user level quota. diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala index 9db3912a78f..ec9bb479e4a 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala @@ -85,7 +85,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite } } - test("celeborn spark integration test - unregister shuffle with throwsFetchFailure disabled") { + test("celeborn spark integration test - unregister shuffle with stageRerun disabled") { if (Spark3OrNewer) { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2,3]") val sparkSession = SparkSession.builder() @@ -241,7 +241,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite } } - test(s"celeborn spark integration test - resubmit an unordered barrier stage with throwsFetchFailure enabled") { + test(s"celeborn spark integration test - resubmit an unordered barrier stage with stageRerun enabled") { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]") val sparkSession = SparkSession.builder() .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) @@ -285,7 +285,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite } } - test(s"celeborn spark integration test - fetch failure in child of an unordered barrier stage with throwsFetchFailure enabled") { + test(s"celeborn spark integration test - fetch failure in child of an unordered barrier stage with stageRerun enabled") { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]") val sparkSession = SparkSession.builder() .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) From 1cd5e4af4e3b308f84de0827f39104aa783663c5 Mon Sep 17 00:00:00 2001 From: Xianming Lei Date: Wed, 11 Jun 2025 17:16:58 +0800 Subject: [PATCH 2/3] fix --- .../org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala index d4e57a47c9e..58ca1b01115 100644 --- a/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala @@ -54,7 +54,7 @@ class SparkUtilsSuite extends AnyFunSuite .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) .config("spark.sql.shuffle.partitions", 2) .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false) - .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true") + .config("spark.celeborn.client.spark.stageRerun.enabled", "true") .config( "spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager") @@ -115,7 +115,7 @@ class SparkUtilsSuite extends AnyFunSuite .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) .config("spark.sql.shuffle.partitions", 2) .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false) - .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true") + .config("spark.celeborn.client.spark.stageRerun.enabled", "true") .config( "spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager") @@ -167,7 +167,7 @@ class SparkUtilsSuite extends AnyFunSuite .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) .config("spark.sql.shuffle.partitions", 2) .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false) - .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true") + .config("spark.celeborn.client.spark.stageRerun.enabled", "true") .config( "spark.shuffle.manager", "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager") From 576c8e4d911bf55faf77cab6bf8e7517f4cdf975 Mon Sep 17 00:00:00 2001 From: Xianming Lei Date: Wed, 11 Jun 2025 17:28:18 +0800 Subject: [PATCH 3/3] fix --- .../celeborn/tests/spark/CelebornFetchFailureSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala index ec9bb479e4a..58cfe27594d 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala @@ -241,7 +241,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite } } - test(s"celeborn spark integration test - resubmit an unordered barrier stage with stageRerun enabled") { + test( + s"celeborn spark integration test - resubmit an unordered barrier stage with stageRerun enabled") { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]") val sparkSession = SparkSession.builder() .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) @@ -285,7 +286,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite } } - test(s"celeborn spark integration test - fetch failure in child of an unordered barrier stage with stageRerun enabled") { + test(s"celeborn spark integration test - fetch failure in child of an unordered " + + s"barrier stage with stageRerun enabled") { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]") val sparkSession = SparkSession.builder() .config(updateSparkConf(sparkConf, ShuffleMode.HASH))