diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 9af5c1b2a636..0ecd9662ae93 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -564,7 +564,7 @@ jobs: fail-fast: false matrix: spark: [ "spark-3.2" ] - celeborn: [ "celeborn-0.5.4", "celeborn-0.4.3"] + celeborn: [ "celeborn-0.6.0", "celeborn-0.5.4", "celeborn-0.4.3"] runs-on: ubuntu-22.04 container: apache/gluten:centos-8-jdk8 steps: @@ -590,6 +590,8 @@ jobs: EXTRA_PROFILE="-Pceleborn-0.4" elif [ "${{ matrix.celeborn }}" = "celeborn-0.5.4" ]; then EXTRA_PROFILE="-Pceleborn-0.5" + elif [ "${{ matrix.celeborn }}" = "celeborn-0.6.0" ]; then + EXTRA_PROFILE="-Pceleborn-0.6" fi echo "EXTRA_PROFILE: ${EXTRA_PROFILE}" if [ ! -e "/opt/apache-${{ matrix.celeborn }}-bin.tgz" ]; then diff --git a/dev/docker/Dockerfile.centos8-dynamic-build b/dev/docker/Dockerfile.centos8-dynamic-build index be013f98faa7..a9619922054d 100644 --- a/dev/docker/Dockerfile.centos8-dynamic-build +++ b/dev/docker/Dockerfile.centos8-dynamic-build @@ -43,6 +43,7 @@ RUN set -ex; \ rm -rf ${local_binary}; \ wget -nv https://archive.apache.org/dist/celeborn/celeborn-0.4.3/apache-celeborn-0.4.3-bin.tgz -P /opt/; \ wget -nv https://archive.apache.org/dist/celeborn/celeborn-0.5.4/apache-celeborn-0.5.4-bin.tgz -P /opt/; \ + wget -nv https://archive.apache.org/dist/celeborn/celeborn-0.6.0/apache-celeborn-0.6.0-bin.tgz -P /opt/; \ wget -nv https://archive.apache.org/dist/incubator/uniffle/0.9.2/apache-uniffle-0.9.2-incubating-bin.tar.gz -P /opt/; \ wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz -P /opt/; \ git clone --depth=1 https://github.com/apache/incubator-gluten /opt/gluten; \ diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java index 2ee810d63b24..c963e4be208b 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java @@ -103,8 +103,10 @@ public class CelebornShuffleManager implements ShuffleManager, SupportsColumnarS // for Celeborn 0.4.0 private final Object shuffleIdTracker; - // for Celeborn 0.4.0 - private final boolean throwsFetchFailure; + // for Celeborn 0.6.0 + private final boolean stageRerunEnabled; + + private Object failedShuffleCleaner = null; public CelebornShuffleManager(SparkConf conf) { if (conf.getBoolean(LOCAL_SHUFFLE_READER_KEY, true)) { @@ -120,7 +122,7 @@ public CelebornShuffleManager(SparkConf conf) { this.shuffleIdTracker = CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME); - this.throwsFetchFailure = CelebornUtils.getThrowsFetchFailure(celebornConf); + this.stageRerunEnabled = CelebornUtils.getStageRerunEnabled(celebornConf); this.celebornDefaultCodec = CelebornConf.SHUFFLE_COMPRESSION_CODEC().defaultValueString(); @@ -162,7 +164,7 @@ private SparkShuffleManager vanillaCelebornShuffleManager() { return _vanillaCelebornShuffleManager; } - private void initializeLifecycleManager() { + private void initializeLifecycleManager(String appId) { // Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we // need to ensure that LifecycleManager will only be created once. Parallelism needs to be // considered in this place, because if there is one RDD that depends on multiple RDDs @@ -170,10 +172,14 @@ private void initializeLifecycleManager() { if (isDriver() && lifecycleManager == null) { synchronized (this) { if (lifecycleManager == null) { + appUniqueId = CelebornUtils.getAppUniqueId(appId, celebornConf); lifecycleManager = new LifecycleManager(appUniqueId, celebornConf); - // for Celeborn 0.4.0 - CelebornUtils.registerShuffleTrackerCallback(throwsFetchFailure, lifecycleManager); + // for Celeborn 0.6.0 + CelebornUtils.incrementApplicationCount(lifecycleManager); + CelebornUtils.registerCancelShuffleCallback(lifecycleManager); + CelebornUtils.stageRerun( + stageRerunEnabled, lifecycleManager, celebornConf, failedShuffleCleaner); shuffleClient = CelebornUtils.getShuffleClient( @@ -199,7 +205,7 @@ private ShuffleHandle registerCelebornShuffleHandle( lifecycleManager.getPort(), lifecycleManager.getUserIdentifier(), shuffleId, - throwsFetchFailure, + stageRerunEnabled, dependency.rdd().getNumPartitions(), dependency); } @@ -207,15 +213,15 @@ private ShuffleHandle registerCelebornShuffleHandle( @Override public ShuffleHandle registerShuffle( int shuffleId, ShuffleDependency dependency) { - appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context()); - initializeLifecycleManager(); + String appId = SparkUtils.appUniqueId(dependency.rdd().context()); + initializeLifecycleManager(appId); // Note: generate app unique id at driver side, make sure dependency.rdd.context // is the same SparkContext among different shuffleIds. // This method may be called many times. if (dependency instanceof ColumnarShuffleDependency) { - if (fallbackPolicyRunner.applyAllFallbackPolicy( - lifecycleManager, dependency.partitioner().numPartitions())) { + CelebornUtils.incrementShuffleCount(lifecycleManager); + if (CelebornUtils.applyFallbackPolicies(fallbackPolicyRunner, lifecycleManager, dependency)) { if (GlutenConfig.get().enableCelebornFallback()) { logger.warn("Fallback to ColumnarShuffleManager!"); columnarShuffleIds.add(shuffleId); @@ -245,7 +251,7 @@ public boolean unregisterShuffle(int shuffleId) { shuffleIdTracker, shuffleId, appUniqueId, - throwsFetchFailure, + stageRerunEnabled, isDriver()); } @@ -273,6 +279,9 @@ public void stop() { _vanillaCelebornShuffleManager.stop(); _vanillaCelebornShuffleManager = null; } + if (failedShuffleCleaner != null) { + CelebornUtils.resetFailedShuffleCleaner(failedShuffleCleaner); + } } @Override @@ -307,12 +316,18 @@ public ShuffleWriter getWriter( false, extension); - // for Celeborn 0.5.2 try { - Field field = CelebornShuffleHandle.class.getDeclaredField("throwsFetchFailure"); + Field field; + try { + // for Celeborn 0.6.0 + field = CelebornShuffleHandle.class.getDeclaredField("stageRerunEnabled"); + } catch (NoSuchFieldException e) { + // for Celeborn 0.5.2 + field = CelebornShuffleHandle.class.getDeclaredField("throwsFetchFailure"); + } field.setAccessible(true); - boolean throwsFetchFailure = (boolean) field.get(handle); - if (throwsFetchFailure) { + boolean stageRerunEnabled = (boolean) field.get(handle); + if (stageRerunEnabled) { Method addFailureListenerMethod = SparkUtils.class.getMethod( "addFailureListenerIfBarrierTask", diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java index 6b4229ad3037..d28726ac3c7b 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java @@ -25,7 +25,9 @@ import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.rdd.DeterministicLevel; +import org.apache.spark.scheduler.SparkListener; import org.apache.spark.shuffle.ShuffleReadMetricsReporter; +import org.apache.spark.shuffle.celeborn.CelebornShuffleFallbackPolicyRunner; import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle; import org.apache.spark.shuffle.celeborn.CelebornShuffleReader; import org.apache.spark.shuffle.celeborn.SparkUtils; @@ -34,7 +36,11 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Method; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; public class CelebornUtils { @@ -42,6 +48,12 @@ public class CelebornUtils { public static final String EXECUTOR_SHUFFLE_ID_TRACKER_NAME = "org.apache.spark.shuffle.celeborn.ExecutorShuffleIdTracker"; + public static final String SHUFFLE_FETCH_FAILURE_REPORT_TASK_CLEAN_LISTENER_NAME = + "org.apache.spark.shuffle.celeborn.ShuffleFetchFailureReportTaskCleanListener"; + public static final String FAILED_SHUFFLE_CLEANER_NAME = + "org.apache.celeborn.spark.FailedShuffleCleaner"; + public static final String GET_REDUCER_FILE_GROUP_RESPONSE_NAME = + "org.apache.celeborn.common.protocol.message.ControlMessages.GetReducerFileGroupResponse"; public static boolean unregisterShuffle( LifecycleManager lifecycleManager, @@ -49,7 +61,7 @@ public static boolean unregisterShuffle( Object shuffleIdTracker, int appShuffleId, String appUniqueId, - boolean throwsFetchFailure, + boolean stageRerunEnabled, boolean isDriver) { try { try { @@ -60,7 +72,7 @@ public static boolean unregisterShuffle( lifecycleManager .getClass() .getMethod("unregisterAppShuffle", int.class, boolean.class); - unregisterAppShuffle.invoke(lifecycleManager, appShuffleId, throwsFetchFailure); + unregisterAppShuffle.invoke(lifecycleManager, appShuffleId, stageRerunEnabled); } } catch (NoSuchMethodException ex) { // for Celeborn 0.4.0 @@ -183,14 +195,14 @@ public static ShuffleClient getShuffleClient( } } - public static Object createInstance(String className) { + public static Object createInstance(String className, Object... args) { try { try { Class clazz = Class.forName(className); Constructor constructor = clazz.getConstructor(); - return constructor.newInstance(); + return constructor.newInstance(args); } catch (ClassNotFoundException e) { return null; @@ -206,7 +218,7 @@ public static CelebornShuffleHandle getCelebornShuffleHandle( int lifecycleManagerPort, UserIdentifier userIdentifier, int shuffleId, - boolean throwsFetchFailure, + boolean stageRerunEnabled, int numMappers, ShuffleDependency dependency) { try { @@ -228,7 +240,7 @@ public static CelebornShuffleHandle getCelebornShuffleHandle( lifecycleManagerPort, userIdentifier, shuffleId, - throwsFetchFailure, + stageRerunEnabled, numMappers, dependency); } catch (NoSuchMethodException noMethod) { @@ -327,11 +339,19 @@ public static Class getClassOrDefault(String className) { } } - public static boolean getThrowsFetchFailure(CelebornConf celebornConf) { + public static boolean getStageRerunEnabled(CelebornConf celebornConf) { try { - Method clientFetchThrowsFetchFailureMethod = - celebornConf.getClass().getDeclaredMethod("clientFetchThrowsFetchFailure"); - return (Boolean) clientFetchThrowsFetchFailureMethod.invoke(celebornConf); + Method clientStageRerunEnabledMethod; + try { + // for Celeborn 0.6.0 + clientStageRerunEnabledMethod = + celebornConf.getClass().getDeclaredMethod("clientStageRerunEnabled"); + } catch (NoSuchMethodException e) { + // for Celeborn 0.4.0 + clientStageRerunEnabledMethod = + celebornConf.getClass().getDeclaredMethod("clientFetchThrowsFetchFailure"); + } + return (Boolean) clientStageRerunEnabledMethod.invoke(celebornConf); } catch (NoSuchMethodException e) { return false; } catch (Exception e) { @@ -339,32 +359,48 @@ public static boolean getThrowsFetchFailure(CelebornConf celebornConf) { } } + public static void stageRerun( + boolean stageRerunEnabled, + LifecycleManager lifecycleManager, + CelebornConf celebornConf, + Object failedShuffleCleaner) { + if (stageRerunEnabled) { + MapOutputTrackerMaster mapOutputTracker = + (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker(); + + // for Celeborn 0.6.0 + registerReportTaskShuffleFetchFailurePreCheck(lifecycleManager); + + registerShuffleTrackerCallback(lifecycleManager, mapOutputTracker); + + // for Celeborn 0.6.0 + registerCelebornSkewShuffleCheckCallback(lifecycleManager, celebornConf); + fetchCleanFailedShuffle(lifecycleManager, celebornConf, failedShuffleCleaner); + reducerFileGroupBroadcast(lifecycleManager, celebornConf); + } + } + public static void registerShuffleTrackerCallback( - boolean throwsFetchFailure, LifecycleManager lifecycleManager) { + LifecycleManager lifecycleManager, MapOutputTrackerMaster mapOutputTracker) { try { - if (throwsFetchFailure) { - MapOutputTrackerMaster mapOutputTracker = - (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker(); + Method registerShuffleTrackerCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerShuffleTrackerCallback", Consumer.class); - Method registerShuffleTrackerCallbackMethod = - lifecycleManager - .getClass() - .getDeclaredMethod("registerShuffleTrackerCallback", Consumer.class); + Consumer consumer = + shuffleId -> { + try { + Method unregisterAllMapOutputMethod = + SparkUtils.class.getMethod( + "unregisterAllMapOutput", MapOutputTrackerMaster.class, int.class); + unregisterAllMapOutputMethod.invoke(null, mapOutputTracker, shuffleId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; - Consumer consumer = - shuffleId -> { - try { - Method unregisterAllMapOutputMethod = - SparkUtils.class.getMethod( - "unregisterAllMapOutput", MapOutputTrackerMaster.class, int.class); - unregisterAllMapOutputMethod.invoke(null, mapOutputTracker, shuffleId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - - registerShuffleTrackerCallbackMethod.invoke(lifecycleManager, consumer); - } + registerShuffleTrackerCallbackMethod.invoke(lifecycleManager, consumer); } catch (NoSuchMethodException e) { logger.debug("Executing the initializeLifecycleManager of celeborn-0.3.x"); } catch (Exception e) { @@ -391,4 +427,271 @@ public static void registerAppShuffleDeterminate( throw new RuntimeException(e); } } + + public static String getAppUniqueId(String appId, CelebornConf celebornConf) { + try { + // for Celeborn 0.6.0 + Method appUniqueIdWithUUIDSuffixMethod = + celebornConf.getClass().getDeclaredMethod("appUniqueIdWithUUIDSuffix", String.class); + return (String) appUniqueIdWithUUIDSuffixMethod.invoke(celebornConf, appId); + } catch (NoSuchMethodException e) { + return appId; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void incrementApplicationCount(LifecycleManager lifecycleManager) { + try { + // for Celeborn 0.6.0 + Method applicationCountMethod = + lifecycleManager.getClass().getDeclaredMethod("applicationCount"); + ((LongAdder) applicationCountMethod.invoke(lifecycleManager)).increment(); + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void registerCancelShuffleCallback(LifecycleManager lifecycleManager) { + try { + // for Celeborn 0.6.0 + Method registerCancelShuffleCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerCancelShuffleCallback", BiConsumer.class); + BiConsumer consumer = + (shuffleId, reason) -> { + try { + Method cancelShuffleMethod = + SparkUtils.class.getMethod("cancelShuffle", int.class, String.class); + cancelShuffleMethod.invoke(null, shuffleId, reason); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerCancelShuffleCallbackMethod.invoke(lifecycleManager, consumer); + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void registerReportTaskShuffleFetchFailurePreCheck( + LifecycleManager lifecycleManager) { + try { + // for Celeborn 0.6.0 + Method registerReportTaskShuffleFetchFailurePreCheckMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerReportTaskShuffleFetchFailurePreCheck", Function.class); + Function function = + taskId -> { + try { + Method shouldReportShuffleFetchFailureMethod = + SparkUtils.class.getMethod("shouldReportShuffleFetchFailure", long.class); + return (Boolean) shouldReportShuffleFetchFailureMethod.invoke(null, taskId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerReportTaskShuffleFetchFailurePreCheckMethod.invoke(lifecycleManager, function); + Method addSparkListenerMethod = + SparkUtils.class.getMethod("addSparkListener", SparkListener.class); + addSparkListenerMethod.invoke( + null, createInstance(SHUFFLE_FETCH_FAILURE_REPORT_TASK_CLEAN_LISTENER_NAME)); + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void registerCelebornSkewShuffleCheckCallback( + LifecycleManager lifecycleManager, CelebornConf celebornConf) { + try { + // for Celeborn 0.6.0 + Method clientAdaptiveOptimizeSkewedPartitionReadEnabledMethod = + celebornConf + .getClass() + .getDeclaredMethod("clientAdaptiveOptimizeSkewedPartitionReadEnabled"); + if ((Boolean) clientAdaptiveOptimizeSkewedPartitionReadEnabledMethod.invoke(celebornConf)) { + Method registerCelebornSkewShuffleCheckCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerCelebornSkewShuffleCheckCallback", Function.class); + Function function = + appShuffleId -> { + try { + Method isCelebornSkewShuffleOrChildShuffleMethod = + SparkUtils.class.getMethod("isCelebornSkewShuffleOrChildShuffle", int.class); + return (Boolean) + isCelebornSkewShuffleOrChildShuffleMethod.invoke(null, appShuffleId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerCelebornSkewShuffleCheckCallbackMethod.invoke(lifecycleManager, function); + } + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void fetchCleanFailedShuffle( + LifecycleManager lifecycleManager, CelebornConf celebornConf, Object failedShuffleCleaner) { + try { + // for Celeborn 0.6.0 + Method clientFetchCleanFailedShuffleMethod = + celebornConf.getClass().getDeclaredMethod("clientFetchCleanFailedShuffle"); + if ((Boolean) clientFetchCleanFailedShuffleMethod.invoke(celebornConf)) { + failedShuffleCleaner = createInstance(FAILED_SHUFFLE_CLEANER_NAME, lifecycleManager); + Method registerValidateCelebornShuffleIdForCleanCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod( + "registerValidateCelebornShuffleIdForCleanCallback", Consumer.class); + Object shuffleCleaner = failedShuffleCleaner; + Consumer addShuffleIdToBeCleanedConsumer = + appShuffleIdentifier -> { + try { + Method addShuffleIdToBeCleanedMethod = + shuffleCleaner + .getClass() + .getDeclaredMethod("addShuffleIdToBeCleaned", String.class); + addShuffleIdToBeCleanedMethod.invoke(shuffleCleaner, appShuffleIdentifier); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerValidateCelebornShuffleIdForCleanCallbackMethod.invoke( + lifecycleManager, addShuffleIdToBeCleanedConsumer); + Method registerUnregisterShuffleCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerUnregisterShuffleCallback", Consumer.class); + Consumer removeCleanedShuffleIdConsumer = + celebornShuffleId -> { + try { + Method removeCleanedShuffleIdMethod = + shuffleCleaner + .getClass() + .getDeclaredMethod("removeCleanedShuffleId", int.class); + removeCleanedShuffleIdMethod.invoke(shuffleCleaner, celebornShuffleId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerUnregisterShuffleCallbackMethod.invoke( + lifecycleManager, removeCleanedShuffleIdConsumer); + } + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void reducerFileGroupBroadcast( + LifecycleManager lifecycleManager, CelebornConf celebornConf) { + try { + // for Celeborn 0.6.0 + Method getReducerFileGroupBroadcastEnabledMethod = + celebornConf.getClass().getDeclaredMethod("getReducerFileGroupBroadcastEnabled"); + if ((Boolean) getReducerFileGroupBroadcastEnabledMethod.invoke(celebornConf)) { + Method registerBroadcastGetReducerFileGroupResponseCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod( + "registerBroadcastGetReducerFileGroupResponseCallback", BiFunction.class); + BiFunction function = + (shuffleId, getReducerFileGroupResponse) -> { + try { + Method serializeGetReducerFileGroupResponseMethod = + SparkUtils.class.getMethod( + "serializeGetReducerFileGroupResponse", + Integer.class, + getClassOrDefault(GET_REDUCER_FILE_GROUP_RESPONSE_NAME)); + return (byte[]) + serializeGetReducerFileGroupResponseMethod.invoke( + null, shuffleId, getReducerFileGroupResponse); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerBroadcastGetReducerFileGroupResponseCallbackMethod.invoke( + lifecycleManager, function); + Method registerInvalidatedBroadcastCallbackMethod = + lifecycleManager + .getClass() + .getDeclaredMethod("registerInvalidatedBroadcastCallback", Consumer.class); + Consumer consumer = + shuffleId -> { + try { + Method invalidateSerializedGetReducerFileGroupResponseMethod = + SparkUtils.class.getMethod( + "invalidateSerializedGetReducerFileGroupResponse", Integer.class); + invalidateSerializedGetReducerFileGroupResponseMethod.invoke(null, shuffleId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + registerInvalidatedBroadcastCallbackMethod.invoke(lifecycleManager, consumer); + } + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void incrementShuffleCount(LifecycleManager lifecycleManager) { + try { + // for Celeborn 0.6.0 + Method shuffleCountMethod = lifecycleManager.getClass().getDeclaredMethod("shuffleCount"); + ((LongAdder) shuffleCountMethod.invoke(lifecycleManager)).increment(); + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static boolean applyFallbackPolicies( + CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner, + LifecycleManager lifecycleManager, + ShuffleDependency dependency) { + try { + try { + // for Celeborn 0.6.0 + Method applyFallbackPoliciesMethod = + fallbackPolicyRunner + .getClass() + .getDeclaredMethod( + "applyFallbackPolicies", ShuffleDependency.class, LifecycleManager.class); + return (Boolean) + applyFallbackPoliciesMethod.invoke(fallbackPolicyRunner, dependency, lifecycleManager); + } catch (NoSuchMethodException e) { + Method applyAllFallbackPolicyMethod = + fallbackPolicyRunner + .getClass() + .getDeclaredMethod("applyAllFallbackPolicy", LifecycleManager.class, int.class); + return (Boolean) + applyAllFallbackPolicyMethod.invoke( + fallbackPolicyRunner, lifecycleManager, dependency.partitioner().numPartitions()); + } + } catch (NoSuchMethodException e) { + return false; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void resetFailedShuffleCleaner(Object failedShuffleCleaner) { + try { + // for Celeborn 0.6.0 + Method resetMethod = failedShuffleCleaner.getClass().getDeclaredMethod("reset"); + resetMethod.invoke(failedShuffleCleaner); + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/pom.xml b/pom.xml index a311a9f424e1..b321e011efea 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ delta-core 2.4.0 24 - 0.5.4 + 0.6.0 0.9.2 15.0.0 15.0.0-gluten diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 823dcb0c9fc5..2e0653b328a2 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -21,7 +21,7 @@ 3.4.4 2.12 3 - 0.5.4 + 0.6.0 0.9.2 1.5.0-SNAPSHOT 32.0.1-jre @@ -183,7 +183,13 @@ celeborn-0.5 - 0.5.3 + 0.5.4 + + + + celeborn-0.6 + + 0.6.0