Skip to content

Commit eb1be3f

Browse files
SteNicholascfmcgrady
authored andcommitted
[CELEBORN-1120] ShuffleClientImpl should close batchReviveRequestScheduler of ReviveManager
### What changes were proposed in this pull request? `ShuffleClientImpl` closes `batchReviveRequestScheduler` of `ReviveManager`. ### Why are the changes needed? After shuffle client is closed, `ReviveManager` still schedules invoker to `ShuffleClientImpl#reviveBatch`, which causes the `NullPointerException`. Therefore, `ShuffleClientImpl` should close `batchReviveRequestScheduler` of `ReviveManager` to avoid `NullPointerException`. ``` 23/11/08 18:09:25,819 [batch-revive-scheduler] ERROR ShuffleClientImpl: Exception raised while reviving for shuffle 0 partitionIds 1988, epochs 0,. java.lang.NullPointerException at org.apache.celeborn.client.ShuffleClientImpl.reviveBatch(ShuffleClientImpl.java:705) at org.apache.celeborn.client.ReviveManager.lambda$new$1(ReviveManager.java:94) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 23/11/08 18:09:25,844 [celeborn-retry-sender-6] ERROR ShuffleClientImpl: Push data to xx.xx.xx.xx:9092 failed for shuffle 0 map 216 attempt 0 partition 1988 batch 2623, remain revive times 4. org.apache.celeborn.common.exception.CelebornIOException: PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY then revive but REVIVE_FAILED, revive status 12(REVIVE_FAILED), old location: PartitionLocation[ id-epoch:1988-0 host-rpcPort-pushPort-fetchPort-replicatePort:xx.xx.xx.xx-9091-9092-9093-9094 mode:PRIMARY peer:(empty) storage hint:StorageInfo{type=MEMORY, mountPoint='/tmp/storage', finalResult=false, filePath=} mapIdBitMap:null] at org.apache.celeborn.client.ShuffleClientImpl.submitRetryPushData(ShuffleClientImpl.java:261) at org.apache.celeborn.client.ShuffleClientImpl.access$600(ShuffleClientImpl.java:62) at org.apache.celeborn.client.ShuffleClientImpl$3.lambda$onFailure$1(ShuffleClientImpl.java:1045) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2084 from SteNicholas/CELEBORN-1120. Authored-by: SteNicholas <[email protected]> Signed-off-by: Fu Chen <[email protected]>
1 parent 02cea04 commit eb1be3f

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

client/src/main/java/org/apache/celeborn/client/ReviveManager.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.concurrent.ScheduledExecutorService;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import scala.concurrent.duration.Duration;
26+
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

@@ -35,17 +37,16 @@ class ReviveManager {
3537
private static final Logger logger = LoggerFactory.getLogger(ReviveManager.class);
3638

3739
LinkedBlockingQueue<ReviveRequest> requestQueue = new LinkedBlockingQueue<>();
38-
private final long interval;
3940
private final int batchSize;
4041
ShuffleClientImpl shuffleClient;
41-
private ScheduledExecutorService batchReviveRequestScheduler =
42+
private final ScheduledExecutorService batchReviveRequestScheduler =
4243
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler");
4344

4445
public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) {
4546
this.shuffleClient = shuffleClient;
46-
this.interval = conf.clientPushReviveInterval();
4747
this.batchSize = conf.clientPushReviveBatchSize();
4848

49+
long interval = conf.clientPushReviveInterval();
4950
batchReviveRequestScheduler.scheduleWithFixedDelay(
5051
() -> {
5152
Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
@@ -124,4 +125,8 @@ public void addRequest(ReviveRequest request) {
124125
logger.error("Exception when put into requests!", e);
125126
}
126127
}
128+
129+
public void close() {
130+
ThreadUtils.shutdown(batchReviveRequestScheduler, Duration.apply("800ms"));
131+
}
127132
}

client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java

+17-16
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ public class ShuffleClientImpl extends ShuffleClient {
7373

7474
private final int registerShuffleMaxRetries;
7575
private final long registerShuffleRetryWaitMs;
76-
private int maxReviveTimes;
77-
private boolean testRetryRevive;
76+
private final int maxReviveTimes;
77+
private final boolean testRetryRevive;
7878
private final int pushBufferMaxSize;
7979
protected final long pushDataTimeout;
8080

@@ -113,7 +113,7 @@ public class ShuffleClientImpl extends ShuffleClient {
113113

114114
protected final String appUniqueId;
115115

116-
private ThreadLocal<Compressor> compressorThreadLocal =
116+
private final ThreadLocal<Compressor> compressorThreadLocal =
117117
new ThreadLocal<Compressor>() {
118118
@Override
119119
protected Compressor initialValue() {
@@ -601,13 +601,13 @@ protected void limitZeroInFlight(String mapKey, PushState pushState) throws IOEx
601601
}
602602

603603
/**
604-
* check if a newer PartitionLocation(with larger epoch) exists in local cache
604+
* Check if a newer PartitionLocation(with larger epoch) exists in local cache.
605605
*
606-
* @param shuffleMap
607-
* @param partitionId
608-
* @param epoch
609-
* @param wait whether to wait for some time for a newer PartitionLocation
610-
* @return
606+
* @param shuffleMap The mapping between shuffle id and partition location.
607+
* @param partitionId The id of partition.
608+
* @param epoch The epoch of revive.
609+
* @param wait Whether to wait for some time for a newer partition location.
610+
* @return whether newer partition location exists in local cache.
611611
*/
612612
boolean newerPartitionLocationExists(
613613
Map<Integer, PartitionLocation> shuffleMap, int partitionId, int epoch, boolean wait) {
@@ -675,12 +675,10 @@ private boolean revive(
675675
attemptId,
676676
partitionId);
677677
return true;
678-
} else if (results == null
679-
|| !results.containsKey(partitionId)
680-
|| results.get(partitionId) != StatusCode.SUCCESS.getValue()) {
681-
return false;
682678
} else {
683-
return true;
679+
return results != null
680+
&& results.containsKey(partitionId)
681+
&& results.get(partitionId) == StatusCode.SUCCESS.getValue();
684682
}
685683
}
686684

@@ -1595,7 +1593,7 @@ public CelebornInputStream readPartition(
15951593
throws IOException {
15961594
ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
15971595

1598-
if (fileGroups.partitionGroups.size() == 0
1596+
if (fileGroups.partitionGroups.isEmpty()
15991597
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
16001598
logger.warn("Shuffle data is empty for shuffle {} partition {}.", shuffleId, partitionId);
16011599
return CelebornInputStream.empty();
@@ -1622,6 +1620,9 @@ public Map<Integer, ReduceFileGroups> getReduceFileGroupsMap() {
16221620

16231621
@Override
16241622
public void shutdown() {
1623+
if (null != reviveManager) {
1624+
reviveManager.close();
1625+
}
16251626
if (null != rpcEnv) {
16261627
rpcEnv.shutdown();
16271628
}
@@ -1666,7 +1667,7 @@ private StatusCode getPushDataFailCause(String message) {
16661667
logger.debug("Push data failed cause message: {}", message);
16671668
StatusCode cause;
16681669
if (message == null) {
1669-
logger.error("Push data throw unexpected exception: {}", message);
1670+
logger.error("Push data throw unexpected exception");
16701671
cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
16711672
} else if (message.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
16721673
cause = StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA;

0 commit comments

Comments
 (0)