Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ index 3609548f374..d34f43bf064 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+ CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle
+
+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
+ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ index af689db3379..39d0b3132ee 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+ CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle
+
+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
+ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ index dbed66683b0..d656c8af6b7 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+ CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle
+
+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
+ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ index 9370b3d8d1d..d36e26a1376 100644
+ val celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled =
+ CelebornShuffleState.celebornAdaptiveOptimizeSkewedPartitionReadEnabled && isCelebornShuffle
+
+ val throwsFetchFailure = CelebornShuffleState.celebornStageRerunEnabled
+ if (throwsFetchFailure && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ val stageRerunEnabled = CelebornShuffleState.celebornStageRerunEnabled
+ if (stageRerunEnabled && celebornClientAdaptiveOptimizeSkewedPartitionReadEnabled) {
+ logInfo(s"Celeborn shuffle retry enabled and shuffle $shuffleId is skewed")
+ CelebornShuffleState.registerCelebornSkewedShuffle(shuffleId)
+ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
celebornConf,
h.userIdentifier(),
h.extension());
if (h.throwsFetchFailure()) {
if (h.stageRerunEnabled()) {
SparkUtils.addFailureListenerIfBarrierTask(client, context, h);
}
int shuffleId = SparkUtils.celebornShuffleId(client, h, context, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static int celebornShuffleId(
CelebornShuffleHandle<?, ?, ?> handle,
TaskContext context,
Boolean isWriter) {
if (handle.throwsFetchFailure()) {
if (handle.stageRerunEnabled()) {
String appShuffleIdentifier = getAppShuffleIdentifier(handle.shuffleId(), context);
Tuple2<Integer, Boolean> res =
client.getShuffleId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C](
val lifecycleManagerPort: Int,
val userIdentifier: UserIdentifier,
shuffleId: Int,
val throwsFetchFailure: Boolean,
val stageRerunEnabled: Boolean,
numMappers: Int,
dependency: ShuffleDependency[K, V, C],
val extension: Array[Byte])
Expand All @@ -39,15 +39,15 @@ class CelebornShuffleHandle[K, V, C](
lifecycleManagerPort: Int,
userIdentifier: UserIdentifier,
shuffleId: Int,
throwsFetchFailure: Boolean,
stageRerunEnabled: Boolean,
numMappers: Int,
dependency: ShuffleDependency[K, V, C]) = this(
appUniqueId,
lifecycleManagerHost,
lifecycleManagerPort,
userIdentifier,
shuffleId,
throwsFetchFailure,
stageRerunEnabled,
numMappers,
dependency,
null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CelebornShuffleReader[K, C](
} catch {
case e: CelebornRuntimeException =>
logError(s"Failed to get shuffleId for appShuffleId ${handle.shuffleId}", e)
if (handle.throwsFetchFailure) {
if (handle.stageRerunEnabled) {
throw new FetchFailedException(
null,
handle.shuffleId,
Expand Down Expand Up @@ -142,7 +142,7 @@ class CelebornShuffleReader[K, C](
if (exceptionRef.get() != null) {
exceptionRef.get() match {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
if (handle.throwsFetchFailure &&
if (handle.stageRerunEnabled &&
shuffleClient.reportShuffleFetchFailure(
handle.shuffleId,
shuffleId,
Expand Down Expand Up @@ -179,7 +179,7 @@ class CelebornShuffleReader[K, C](
iter
} catch {
case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
if (handle.throwsFetchFailure &&
if (handle.stageRerunEnabled &&
shuffleClient.reportShuffleFetchFailure(
handle.shuffleId,
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
celebornConf,
h.userIdentifier(),
h.extension());
if (h.throwsFetchFailure()) {
if (h.stageRerunEnabled()) {
SparkUtils.addFailureListenerIfBarrierTask(shuffleClient, context, h);
}
int shuffleId = SparkUtils.celebornShuffleId(shuffleClient, h, context, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static int celebornShuffleId(
CelebornShuffleHandle<?, ?, ?> handle,
TaskContext context,
Boolean isWriter) {
if (handle.throwsFetchFailure()) {
if (handle.stageRerunEnabled()) {
String appShuffleIdentifier =
SparkCommonUtils.encodeAppShuffleIdentifier(handle.shuffleId(), context);
Tuple2<Integer, Boolean> res =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CelebornShuffleHandle[K, V, C](
val lifecycleManagerPort: Int,
val userIdentifier: UserIdentifier,
shuffleId: Int,
val throwsFetchFailure: Boolean,
val stageRerunEnabled: Boolean,
val numMappers: Int,
dependency: ShuffleDependency[K, V, C],
val extension: Array[Byte])
Expand All @@ -39,15 +39,15 @@ class CelebornShuffleHandle[K, V, C](
lifecycleManagerPort: Int,
userIdentifier: UserIdentifier,
shuffleId: Int,
throwsFetchFailure: Boolean,
stageRerunEnabled: Boolean,
numMappers: Int,
dependency: ShuffleDependency[K, V, C]) = this(
appUniqueId,
lifecycleManagerHost,
lifecycleManagerPort,
userIdentifier,
shuffleId,
throwsFetchFailure,
stageRerunEnabled,
numMappers,
dependency,
null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CelebornShuffleReader[K, C](
handle.extension)

private val exceptionRef = new AtomicReference[IOException]
private val throwsFetchFailure = handle.throwsFetchFailure
private val stageRerunEnabled = handle.stageRerunEnabled
private val encodedAttemptId = SparkCommonUtils.getEncodedAttemptNumber(context)

override def read(): Iterator[Product2[K, C]] = {
Expand All @@ -107,7 +107,7 @@ class CelebornShuffleReader[K, C](
} catch {
case e: CelebornRuntimeException =>
logError(s"Failed to get shuffleId for appShuffleId ${handle.shuffleId}", e)
if (throwsFetchFailure) {
if (stageRerunEnabled) {
throw new FetchFailedException(
null,
handle.shuffleId,
Expand Down Expand Up @@ -329,7 +329,7 @@ class CelebornShuffleReader[K, C](
context.taskAttemptId(),
startMapIndex,
endMapIndex,
if (throwsFetchFailure) ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER
if (stageRerunEnabled) ExceptionMakerHelper.SHUFFLE_FETCH_FAILURE_EXCEPTION_MAKER
else null,
locationList,
streamHandlers,
Expand Down Expand Up @@ -513,7 +513,7 @@ class CelebornShuffleReader[K, C](
shuffleId: Int,
partitionId: Int,
ce: Throwable) = {
if (throwsFetchFailure &&
if (stageRerunEnabled &&
shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, context.taskAttemptId())) {
logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", ce)
throw new FetchFailedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ private boolean fillBuffer() throws IOException {
if (exceptionMaker != null) {
if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, taskId)) {
/*
* [[ExceptionMaker.makeException]], for spark applications with celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating
* [[ExceptionMaker.makeException]], for spark applications with celeborn.client.spark.stageRerun.enabled enabled will result in creating
* a FetchFailedException; and that will make the TaskContext as failed with shuffle fetch issues - see SPARK-19276 for more.
* Given this, Celeborn can wrap the FetchFailedException with our CelebornIOException
*/
Expand Down
2 changes: 2 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ license: |

# Upgrading from 0.5 to 0.6

- Since 0.6.0, Celeborn deprecate `celeborn.client.spark.fetch.throwsFetchFailure`. Please use `celeborn.client.spark.stageRerun.enabled` instead.
-
- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to `celeborn.quota.user.diskBytesWritten`. Please use `celeborn.quota.user.diskBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to `celeborn.quota.user.diskFileCount`. Please use `celeborn.quota.user.diskFileCount` if you want to set user level quota.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
}
}

test("celeborn spark integration test - unregister shuffle with throwsFetchFailure disabled") {
test("celeborn spark integration test - unregister shuffle with stageRerun disabled") {
if (Spark3OrNewer) {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2,3]")
val sparkSession = SparkSession.builder()
Expand Down Expand Up @@ -241,7 +241,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite
}
}

test(s"celeborn spark integration test - resubmit an unordered barrier stage with throwsFetchFailure enabled") {
test(
s"celeborn spark integration test - resubmit an unordered barrier stage with stageRerun enabled") {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]")
val sparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
Expand Down Expand Up @@ -285,7 +286,8 @@ class CelebornFetchFailureSuite extends AnyFunSuite
}
}

test(s"celeborn spark integration test - fetch failure in child of an unordered barrier stage with throwsFetchFailure enabled") {
test(s"celeborn spark integration test - fetch failure in child of an unordered " +
s"barrier stage with stageRerun enabled") {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2]")
val sparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SparkUtilsSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down Expand Up @@ -115,7 +115,7 @@ class SparkUtilsSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down Expand Up @@ -167,7 +167,7 @@ class SparkUtilsSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down
Loading