Skip to content

Commit 407624d

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7356][CARMEL-3130] Better fetch fail handling and MapOutputTracker improvement (apache#113)
1 parent 138a1a5 commit 407624d

File tree

6 files changed

+100
-23
lines changed

6 files changed

+100
-23
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/NettyMemoryMetrics.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import com.codahale.metrics.MetricRegistry;
2727
import com.codahale.metrics.MetricSet;
2828
import com.google.common.annotations.VisibleForTesting;
29-
import io.netty.buffer.PoolArenaMetric;
30-
import io.netty.buffer.PooledByteBufAllocator;
31-
import io.netty.buffer.PooledByteBufAllocatorMetric;
29+
import io.netty.buffer.*;
3230

3331
/**
3432
* A Netty memory metrics class to collect metrics from Netty PooledByteBufAllocator.
@@ -136,6 +134,34 @@ private void registerArenaMetric(PoolArenaMetric arenaMetric, String arenaName)
136134
});
137135
}
138136
}
137+
138+
String chunkMetricName = MetricRegistry.name(metricPrefix, arenaName, "chunksInfo");
139+
allMetrics.put(chunkMetricName, (Gauge<String>) () -> {
140+
try {
141+
synchronized (arenaMetric) {
142+
List<PoolChunkListMetric> chunkListMetricsList = arenaMetric.chunkLists();
143+
StringBuilder builder = new StringBuilder();
144+
for (PoolChunkListMetric chunkListMetrics : chunkListMetricsList) {
145+
builder.append(System.lineSeparator());
146+
builder.append("chunkList-" + chunkListMetrics.minUsage() + "-" +
147+
chunkListMetrics.maxUsage() + ":");
148+
builder.append(System.lineSeparator());
149+
int chunkIdx = 0;
150+
for (PoolChunkMetric chunkMetric : chunkListMetrics) {
151+
String chunkName = "chunk-" + chunkIdx;
152+
String chunkInfo = String.format(Locale.US,"%s: chunkSize:%s, freeBytes:%s, usage:%s",
153+
chunkName, chunkMetric.chunkSize(), chunkMetric.freeBytes(), chunkMetric.usage());
154+
builder.append("\t" + chunkInfo);
155+
builder.append(System.lineSeparator());
156+
chunkIdx++;
157+
}
158+
}
159+
return builder.toString();
160+
}
161+
} catch (Exception e) {
162+
return ""; // Swallow the exceptions.
163+
}
164+
});
139165
}
140166

141167
@Override

common/network-common/src/test/java/org/apache/spark/network/util/NettyMemoryMetricsSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void testAdditionalMetrics() throws IOException, InterruptedException {
136136
Assert.assertTrue(name.startsWith("shuffle-server"));
137137
String metricName = name.substring(name.lastIndexOf(".") + 1);
138138
Assert.assertTrue(metricName.equals("usedDirectMemory")
139-
|| metricName.equals("usedHeapMemory")
139+
|| metricName.equals("usedHeapMemory") || metricName.equals("chunksInfo")
140140
|| NettyMemoryMetrics.VERBOSE_METRICS.contains(metricName));
141141
});
142142

@@ -145,7 +145,7 @@ public void testAdditionalMetrics() throws IOException, InterruptedException {
145145
Assert.assertTrue(name.startsWith("shuffle-client"));
146146
String metricName = name.substring(name.lastIndexOf(".") + 1);
147147
Assert.assertTrue(metricName.equals("usedDirectMemory")
148-
|| metricName.equals("usedHeapMemory")
148+
|| metricName.equals("usedHeapMemory") || metricName.equals("chunksInfo")
149149
|| NettyMemoryMetrics.VERBOSE_METRICS.contains(metricName));
150150
});
151151

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ private class ShuffleStatus(
117117
*/
118118
private[this] var cachedSerializedMapStatus: Array[Byte] = _
119119

120+
/**
121+
* Sometimes map statuses may serialize fail, if too many reducers are waiting map statuses
122+
* serialization, it will occupy all the MessageLoop threads, and block other map statuses
123+
* output requests, so cache the exception so that other reducers' request can failed fast.
124+
*/
125+
@volatile
126+
private[this] var mapStatusSerializedException: Option[Throwable] = None
127+
120128
/**
121129
* Broadcast variable holding serialized map output statuses array. When [[serializedMapStatus]]
122130
* serializes the map statuses array it may detect that the result is too large to send in a
@@ -349,15 +357,27 @@ private class ShuffleStatus(
349357
withReadLock {
350358
if (cachedSerializedMapStatus != null) {
351359
result = cachedSerializedMapStatus
360+
} else if (mapStatusSerializedException.isDefined) {
361+
throw mapStatusSerializedException.get
352362
}
353363
}
354364

355365
if (result == null) withWriteLock {
356366
if (cachedSerializedMapStatus == null) {
357-
val serResult = MapOutputTracker.serializeOutputStatuses[MapStatus](
358-
mapStatuses, broadcastManager, isLocal, minBroadcastSize, conf)
359-
cachedSerializedMapStatus = serResult._1
360-
cachedSerializedBroadcast = serResult._2
367+
if (mapStatusSerializedException.isDefined) {
368+
throw mapStatusSerializedException.get
369+
}
370+
try {
371+
val serResult = MapOutputTracker.serializeOutputStatuses[MapStatus](
372+
mapStatuses, broadcastManager, isLocal, minBroadcastSize, conf)
373+
cachedSerializedMapStatus = serResult._1
374+
cachedSerializedBroadcast = serResult._2
375+
mapStatusSerializedException = None
376+
} catch {
377+
case NonFatal(e) =>
378+
mapStatusSerializedException = Some(e)
379+
throw e
380+
}
361381
}
362382
// The following line has to be outside if statement since it's possible that another thread
363383
// initializes cachedSerializedMapStatus in-between `withReadLock` and `withWriteLock`.
@@ -929,6 +949,13 @@ private[spark] class MapOutputTrackerMaster(
929949
incrementEpoch()
930950
}
931951

952+
def removeOutputsOnExecutorForShuffle(execId: String, shuffleId: Int): Unit = {
953+
shuffleStatuses.get(shuffleId).foreach { shuffleStatus =>
954+
shuffleStatus.removeOutputsOnExecutor(execId)
955+
}
956+
incrementEpoch()
957+
}
958+
932959
/** Check if the given shuffle is being tracked */
933960
def containsShuffle(shuffleId: Int): Boolean = shuffleStatuses.contains(shuffleId)
934961

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,13 @@ package object config {
955955
.booleanConf
956956
.createWithDefault(false)
957957

958+
private[spark] val REMOVE_EXECUTOR_ON_FETCH_FAILURE =
959+
ConfigBuilder("spark.files.fetchFailure.removeExecutor")
960+
.doc("Whether to remove executor on a FetchFailure.")
961+
.version("3.5.0")
962+
.booleanConf
963+
.createWithDefault(true)
964+
958965
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
959966
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
960967
.doc("The default capacity for event queues. Spark will try to initialize " +

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.errors.SparkCoreErrors
4141
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
4242
import org.apache.spark.internal.Logging
4343
import org.apache.spark.internal.config
44-
import org.apache.spark.internal.config.{JOB_GROUP_MAX_SHUFFLE_SIZE, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, TASK_SUBMISSION_ASYNC}
44+
import org.apache.spark.internal.config.{JOB_GROUP_MAX_SHUFFLE_SIZE, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, REMOVE_EXECUTOR_ON_FETCH_FAILURE, TASK_SUBMISSION_ASYNC}
4545
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
4646
import org.apache.spark.network.shuffle.{BlockStoreClient, MergeFinalizerListener}
4747
import org.apache.spark.network.shuffle.protocol.MergeStatuses
@@ -220,6 +220,8 @@ private[spark] class DAGScheduler(
220220
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
221221
private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
222222

223+
private val removeExecutorOnFetchFailure = sc.getConf.get(REMOVE_EXECUTOR_ON_FETCH_FAILURE)
224+
223225
private val shouldMergeResourceProfiles = sc.getConf.get(config.RESOURCE_PROFILE_MERGE_CONFLICTS)
224226

225227
private val groupToShuffleSize = new ConcurrentHashMap[String, AtomicLong].asScala
@@ -611,6 +613,7 @@ private[spark] class DAGScheduler(
611613
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,
612614
shuffleDep.partitioner.numPartitions)
613615
}
616+
log.info(s"Create MapStage:$id with shuffle ${shuffleDep.shuffleId}")
614617
stage
615618
}
616619

@@ -2387,18 +2390,25 @@ private[spark] class DAGScheduler(
23872390
// reason to believe shuffle data has been lost for the entire host).
23882391
None
23892392
}
2390-
removeExecutorAndUnregisterOutputs(
2391-
execId = bmAddress.executorId,
2392-
fileLost = true,
2393-
hostToUnregisterOutputs = hostToUnregisterOutputs,
2394-
maybeEpoch = Some(task.epoch),
2395-
// shuffleFileLostEpoch is ignored when a host is decommissioned because some
2396-
// decommissioned executors on that host might have been removed before this fetch
2397-
// failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and
2398-
// proceed with unconditional removal of shuffle outputs from all executors on that
2399-
// host, including from those that we still haven't confirmed as lost due to heartbeat
2400-
// delays.
2401-
ignoreShuffleFileLostEpoch = isHostDecommissioned)
2393+
if (removeExecutorOnFetchFailure) {
2394+
removeExecutorAndUnregisterOutputs(
2395+
execId = bmAddress.executorId,
2396+
fileLost = true,
2397+
hostToUnregisterOutputs = hostToUnregisterOutputs,
2398+
maybeEpoch = Some(task.epoch),
2399+
// shuffleFileLostEpoch is ignored when a host is decommissioned because some
2400+
// decommissioned executors on that host might have been removed before this fetch
2401+
// failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and
2402+
// proceed with unconditional removal of shuffle outputs from all executors on that
2403+
// host, including from those that we still haven't confirmed as lost due to
2404+
// heartbeat delays.
2405+
ignoreShuffleFileLostEpoch = isHostDecommissioned)
2406+
} else {
2407+
log.info(s"Remove all map status for the shuffle $shuffleId " +
2408+
s"on the executor: ${bmAddress.executorId}, because of fetch failed")
2409+
mapOutputTracker.removeOutputsOnExecutorForShuffle(bmAddress.executorId, shuffleId)
2410+
clearCacheLocs()
2411+
}
24022412
}
24032413
}
24042414

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,18 @@ final class ShuffleBlockFetcherIterator(
312312
// Increment the ref count because we need to pass this to a different thread.
313313
// This needs to be released after use.
314314
buf.retain()
315+
val returnBytes = buf.size()
316+
val expectBytes = infoMap(blockId)._1
317+
if (log.isDebugEnabled && returnBytes != expectBytes) {
318+
logDebug(s"task:${context.taskAttemptId()}," +
319+
s"block:$blockId, return bytes:$returnBytes," +
320+
s"diff:${returnBytes - expectBytes}")
321+
}
315322
remainingBlocks -= blockId
316323
blockOOMRetryCounts.remove(blockId)
317324
updateMergedReqsDuration(BlockId(blockId).isShuffleChunk)
318325
results.put(SuccessFetchResult(BlockId(blockId), infoMap(blockId)._2,
319-
address, infoMap(blockId)._1, buf, remainingBlocks.isEmpty))
326+
address, expectBytes, buf, remainingBlocks.isEmpty))
320327
logDebug("remainingBlocks: " + remainingBlocks)
321328
enqueueDeferredFetchRequestIfNecessary()
322329
} else {

0 commit comments

Comments
 (0)