Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ jobs:
fail-fast: false
matrix:
spark: [ "spark-3.2" ]
celeborn: [ "celeborn-0.5.4", "celeborn-0.4.3"]
celeborn: [ "celeborn-0.6.0", "celeborn-0.5.4", "celeborn-0.4.3"]
runs-on: ubuntu-22.04
container: apache/gluten:centos-8-jdk8
steps:
Expand All @@ -590,6 +590,8 @@ jobs:
EXTRA_PROFILE="-Pceleborn-0.4"
elif [ "${{ matrix.celeborn }}" = "celeborn-0.5.4" ]; then
EXTRA_PROFILE="-Pceleborn-0.5"
elif [ "${{ matrix.celeborn }}" = "celeborn-0.6.0" ]; then
EXTRA_PROFILE="-Pceleborn-0.6"
fi
echo "EXTRA_PROFILE: ${EXTRA_PROFILE}"
if [ ! -e "/opt/apache-${{ matrix.celeborn }}-bin.tgz" ]; then
Expand Down
1 change: 1 addition & 0 deletions dev/docker/Dockerfile.centos8-dynamic-build
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ RUN set -ex; \
rm -rf ${local_binary}; \
wget -nv https://archive.apache.org/dist/celeborn/celeborn-0.4.3/apache-celeborn-0.4.3-bin.tgz -P /opt/; \
wget -nv https://archive.apache.org/dist/celeborn/celeborn-0.5.4/apache-celeborn-0.5.4-bin.tgz -P /opt/; \
wget -nv https://archive.apache.org/dist/celeborn/celeborn-0.6.0/apache-celeborn-0.6.0-bin.tgz -P /opt/; \
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.9.2/apache-uniffle-0.9.2-incubating-bin.tar.gz -P /opt/; \
wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz -P /opt/; \
git clone --depth=1 https://github.com/apache/incubator-gluten /opt/gluten; \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS
// for Celeborn 0.4.0
private final Object shuffleIdTracker;

// for Celeborn 0.4.0
private final boolean throwsFetchFailure;
// for Celeborn 0.6.0
private final boolean stageRerunEnabled;

private Object failedShuffleCleaner = null;

public CelebornShuffleManager(SparkConf conf) {
if (conf.getBoolean(LOCAL_SHUFFLE_READER_KEY, true)) {
Expand All @@ -120,7 +122,7 @@ public CelebornShuffleManager(SparkConf conf) {
this.shuffleIdTracker =
CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME);

this.throwsFetchFailure = CelebornUtils.getThrowsFetchFailure(celebornConf);
this.stageRerunEnabled = CelebornUtils.getStageRerunEnabled(celebornConf);

this.celebornDefaultCodec = CelebornConf.SHUFFLE_COMPRESSION_CODEC().defaultValueString();

Expand Down Expand Up @@ -162,18 +164,22 @@ private SparkShuffleManager vanillaCelebornShuffleManager() {
return _vanillaCelebornShuffleManager;
}

private void initializeLifecycleManager() {
private void initializeLifecycleManager(String appId) {
// Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we
// need to ensure that LifecycleManager will only be created once. Parallelism needs to be
// considered in this place, because if there is one RDD that depends on multiple RDDs
// at the same time, it may bring parallel `register shuffle`, such as Join in Sql.
if (isDriver() && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
appUniqueId = CelebornUtils.getAppUniqueId(appId, celebornConf);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);

// for Celeborn 0.4.0
CelebornUtils.registerShuffleTrackerCallback(throwsFetchFailure, lifecycleManager);
// for Celeborn 0.6.0
CelebornUtils.incrementApplicationCount(lifecycleManager);
CelebornUtils.registerCancelShuffleCallback(lifecycleManager);
CelebornUtils.stageRerun(
stageRerunEnabled, lifecycleManager, celebornConf, failedShuffleCleaner);

shuffleClient =
CelebornUtils.getShuffleClient(
Expand All @@ -199,23 +205,23 @@ private <K, V, C> ShuffleHandle registerCelebornShuffleHandle(
lifecycleManager.getPort(),
lifecycleManager.getUserIdentifier(),
shuffleId,
throwsFetchFailure,
stageRerunEnabled,
dependency.rdd().getNumPartitions(),
dependency);
}

@Override
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager();
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

// Note: generate app unique id at driver side, make sure dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
if (dependency instanceof ColumnarShuffleDependency) {
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
CelebornUtils.incrementShuffleCount(lifecycleManager);
if (CelebornUtils.applyFallbackPolicies(fallbackPolicyRunner, lifecycleManager, dependency)) {
if (GlutenConfig.get().enableCelebornFallback()) {
logger.warn("Fallback to ColumnarShuffleManager!");
columnarShuffleIds.add(shuffleId);
Expand Down Expand Up @@ -245,7 +251,7 @@ public boolean unregisterShuffle(int shuffleId) {
shuffleIdTracker,
shuffleId,
appUniqueId,
throwsFetchFailure,
stageRerunEnabled,
isDriver());
}

Expand Down Expand Up @@ -273,6 +279,9 @@ public void stop() {
_vanillaCelebornShuffleManager.stop();
_vanillaCelebornShuffleManager = null;
}
if (failedShuffleCleaner != null) {
CelebornUtils.resetFailedShuffleCleaner(failedShuffleCleaner);
}
}

@Override
Expand Down Expand Up @@ -307,12 +316,18 @@ public <K, V> ShuffleWriter<K, V> getWriter(
false,
extension);

// for Celeborn 0.5.2
try {
Field field = CelebornShuffleHandle.class.getDeclaredField("throwsFetchFailure");
Field field;
try {
// for Celeborn 0.6.0
field = CelebornShuffleHandle.class.getDeclaredField("stageRerunEnabled");
} catch (NoSuchFieldException e) {
// for Celeborn 0.5.2
field = CelebornShuffleHandle.class.getDeclaredField("throwsFetchFailure");
}
field.setAccessible(true);
boolean throwsFetchFailure = (boolean) field.get(handle);
if (throwsFetchFailure) {
boolean stageRerunEnabled = (boolean) field.get(handle);
if (stageRerunEnabled) {
Method addFailureListenerMethod =
SparkUtils.class.getMethod(
"addFailureListenerIfBarrierTask",
Expand Down
Loading