From ff6a9e9e0b4939f665848ee1a407e9e32cb0200e Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Tue, 15 Oct 2024 19:22:16 +0800 Subject: [PATCH 1/2] [CELEBORN-1648] Refine AppUniqueId with UUID suffix --- .../v2/app/MRAppMasterWithCeleborn.java | 8 ++++---- .../shuffle/celeborn/SparkShuffleManager.java | 7 ++++--- .../shuffle/celeborn/SparkShuffleManager.java | 7 ++++--- .../apache/celeborn/common/CelebornConf.scala | 19 ++++++++++++++++++- docs/configuration/client.md | 1 + 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java index 98653cb93a9..a23eab494f0 100644 --- a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java +++ b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java @@ -66,16 +66,16 @@ public MRAppMasterWithCeleborn( int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0); if (numReducers > 0) { CelebornConf conf = HadoopUtils.fromYarnConf(jobConf); - LifecycleManager lifecycleManager = - new LifecycleManager(applicationAttemptId.toString(), conf); + String appUniqueId = conf.appUniqueIdWithUUIDSuffix(applicationAttemptId.toString()); + LifecycleManager lifecycleManager = new LifecycleManager(appUniqueId, conf); String lmHost = lifecycleManager.getHost(); int lmPort = lifecycleManager.getPort(); - logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, applicationAttemptId); + logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, appUniqueId); JobConf lmConf = new JobConf(); lmConf.clear(); lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost); lmConf.set(HadoopUtils.MR_CELEBORN_LM_PORT, lmPort + ""); - lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, applicationAttemptId.toString()); + lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, appUniqueId); writeLifecycleManagerConfToTask(jobConf, lmConf); } } diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 4f6e835e726..fa0eb96abb1 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -95,7 +95,8 @@ private void initializeLifecycleManager(String appId) { if (isDriver && lifecycleManager == null) { synchronized (this) { if (lifecycleManager == null) { - lifecycleManager = new LifecycleManager(appId, celebornConf); + appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId); + lifecycleManager = new LifecycleManager(appUniqueId, celebornConf); if (celebornConf.clientFetchThrowsFetchFailure()) { MapOutputTrackerMaster mapOutputTracker = (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker(); @@ -113,8 +114,8 @@ public ShuffleHandle registerShuffle( // Note: generate app unique id at driver side, make sure dependency.rdd.context // is the same SparkContext among different shuffleIds. // This method may be called many times. - appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context()); - initializeLifecycleManager(appUniqueId); + String appId = SparkUtils.appUniqueId(dependency.rdd().context()); + initializeLifecycleManager(appId); lifecycleManager.registerAppShuffleDeterminate( shuffleId, diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index da785886ca7..8930c3cff43 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -129,7 +129,7 @@ private SortShuffleManager sortShuffleManager() { return _sortShuffleManager; } - private void initializeLifecycleManager() { + private void initializeLifecycleManager(String appId) { // Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we // need to ensure that LifecycleManager will only be created once. Parallelism needs to be // considered in this place, because if there is one RDD that depends on multiple RDDs @@ -137,6 +137,7 @@ private void initializeLifecycleManager() { if (isDriver && lifecycleManager == null) { synchronized (this) { if (lifecycleManager == null) { + appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId); lifecycleManager = new LifecycleManager(appUniqueId, celebornConf); if (celebornConf.clientFetchThrowsFetchFailure()) { MapOutputTrackerMaster mapOutputTracker = @@ -156,8 +157,8 @@ public ShuffleHandle registerShuffle( // Note: generate app unique id at driver side, make sure dependency.rdd.context // is the same SparkContext among different shuffleIds. // This method may be called many times. - appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context()); - initializeLifecycleManager(); + String appId = SparkUtils.appUniqueId(dependency.rdd().context()); + initializeLifecycleManager(appId); lifecycleManager.registerAppShuffleDeterminate( shuffleId, diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 069f937a1ea..f66f67da690 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.common import java.io.{File, IOException} -import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap} +import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap, UUID} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -905,6 +905,15 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientExcludeReplicaOnFailureEnabled: Boolean = get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED) def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX) + def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED) + + def appUniqueIdWithUUIDSuffix(appId: String): String = { + if (clientApplicationUUIDSuffixEnabled) { + appId + "-" + UUID.randomUUID().toString.replaceAll("-", "") + } else { + appId + } + } // ////////////////////////////////////////////////////// // Shuffle Compression // @@ -5055,6 +5064,14 @@ object CelebornConf extends Logging { .checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1 (inclusive)") .createWithDefault(0.4) + val CLIENT_APPLICATION_UUID_SUFFIX_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.application.uuidSuffix.enabled") + .categories("client") + .version("0.6.0") + .doc("Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id.") + .booleanConf + .createWithDefault(false) + val TEST_ALTERNATIVE: OptionalConfigEntry[String] = buildConf("celeborn.test.alternative.key") .withAlternative("celeborn.test.alternative.deprecatedKey") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 80cd1bef3e7..4925093d014 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -21,6 +21,7 @@ license: | | --- | ------- | --------- | ----------- | ----- | ---------- | | celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval | | celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | +| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. | 0.6.0 | | | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | | | celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | | | celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | | From d7a1613a21d7470e4ccecf9b975e6100e8decb16 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 17 Oct 2024 06:01:54 +0000 Subject: [PATCH 2/2] [CELEBORN-1648] Update doc for UUID suffix --- .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 2 +- docs/configuration/client.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index f66f67da690..f2af8f6cd56 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -5068,7 +5068,7 @@ object CelebornConf extends Logging { buildConf("celeborn.client.application.uuidSuffix.enabled") .categories("client") .version("0.6.0") - .doc("Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id.") + .doc("Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR.") .booleanConf .createWithDefault(false) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 4925093d014..91d1faac52e 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -21,7 +21,7 @@ license: | | --- | ------- | --------- | ----------- | ----- | ---------- | | celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval | | celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | -| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. | 0.6.0 | | +| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | | | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | | | celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | | | celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | |