Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ public MRAppMasterWithCeleborn(
int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (numReducers > 0) {
CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
LifecycleManager lifecycleManager =
new LifecycleManager(applicationAttemptId.toString(), conf);
String appUniqueId = conf.appUniqueIdWithUUIDSuffix(applicationAttemptId.toString());
LifecycleManager lifecycleManager = new LifecycleManager(appUniqueId, conf);
String lmHost = lifecycleManager.getHost();
int lmPort = lifecycleManager.getPort();
logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, applicationAttemptId);
logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, appUniqueId);
JobConf lmConf = new JobConf();
lmConf.clear();
lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost);
lmConf.set(HadoopUtils.MR_CELEBORN_LM_PORT, lmPort + "");
lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, applicationAttemptId.toString());
lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, appUniqueId);
writeLifecycleManagerConfToTask(jobConf, lmConf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private void initializeLifecycleManager(String appId) {
if (isDriver && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
lifecycleManager = new LifecycleManager(appId, celebornConf);
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
Expand All @@ -113,8 +114,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
// Note: generate app unique id at driver side, make sure dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appUniqueId);
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,15 @@ private SortShuffleManager sortShuffleManager() {
return _sortShuffleManager;
}

private void initializeLifecycleManager() {
private void initializeLifecycleManager(String appId) {
// Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we
// need to ensure that LifecycleManager will only be created once. Parallelism needs to be
// considered in this place, because if there is one RDD that depends on multiple RDDs
// at the same time, it may bring parallel `register shuffle`, such as Join in Sql.
if (isDriver && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
Expand All @@ -156,8 +157,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
// Note: generate app unique id at driver side, make sure dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager();
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.celeborn.common

import java.io.{File, IOException}
import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap}
import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap, UUID}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -905,6 +905,15 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)

def appUniqueIdWithUUIDSuffix(appId: String): String = {
if (clientApplicationUUIDSuffixEnabled) {
appId + "-" + UUID.randomUUID().toString.replaceAll("-", "")
} else {
appId
}
}

// //////////////////////////////////////////////////////
// Shuffle Compression //
Expand Down Expand Up @@ -5055,6 +5064,14 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1 (inclusive)")
.createWithDefault(0.4)

val CLIENT_APPLICATION_UUID_SUFFIX_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.application.uuidSuffix.enabled")
.categories("client")
.version("0.6.0")
.doc("Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR.")
.booleanConf
.createWithDefault(false)

val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
buildConf("celeborn.test.alternative.key")
.withAlternative("celeborn.test.alternative.deprecatedKey")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval |
| celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | |
| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | |
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | |
| celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | |
Expand Down