2525import java .util .concurrent .TimeUnit ;
2626
2727import com .google .common .annotations .VisibleForTesting ;
28+ import com .google .common .base .Preconditions ;
2829import com .google .common .collect .Sets ;
2930import com .google .common .util .concurrent .Uninterruptibles ;
3031import org .slf4j .Logger ;
@@ -87,7 +88,16 @@ void createAndStart(String[] blockIds, BlockTransferListener listener)
8788 /** Number of times we've attempted to retry so far. */
8889 private int retryCount = 0 ;
8990
90- private boolean saslTimeoutSeen ;
91+ // Number of times SASL timeout has been retried without success.
92+ // If we see maxRetries consecutive failures, the request is failed.
93+ // On the other hand, if sasl succeeds and we are able to send other requests subsequently,
94+ // we reduce the SASL failures from retryCount (since SASL failures were part of
95+ // connection bootstrap - which ended up being successful).
96+ // spark.network.auth.rpcTimeout is much lower than spark.network.timeout and others -
97+ // and so sasl is more susceptible to failures when remote service
98+ // (like external shuffle service) is under load: but once it succeeds, we do not want to
99+ // include it as part of request retries.
100+ private int saslRetryCount = 0 ;
91101
92102 /**
93103 * Set of all block ids which have not been transferred successfully or with a non-IO Exception.
@@ -123,7 +133,7 @@ public RetryingBlockTransferor(
123133 this .currentListener = new RetryingBlockTransferListener ();
124134 this .errorHandler = errorHandler ;
125135 this .enableSaslRetries = conf .enableSaslRetries ();
126- this .saslTimeoutSeen = false ;
136+ this .saslRetryCount = 0 ;
127137 }
128138
129139 public RetryingBlockTransferor (
@@ -167,7 +177,7 @@ private void transferAllOutstanding() {
167177 numRetries > 0 ? "(after " + numRetries + " retries)" : "" ), e );
168178
169179 if (shouldRetry (e )) {
170- initiateRetry ();
180+ initiateRetry (e );
171181 } else {
172182 for (String bid : blockIdsToTransfer ) {
173183 listener .onBlockTransferFailure (bid , e );
@@ -180,7 +190,10 @@ private void transferAllOutstanding() {
180190 * Lightweight method which initiates a retry in a different thread. The retry will involve
181191 * calling transferAllOutstanding() after a configured wait time.
182192 */
183- private synchronized void initiateRetry () {
193+ private synchronized void initiateRetry (Throwable e ) {
194+ if (enableSaslRetries && e instanceof SaslTimeoutException ) {
195+ saslRetryCount += 1 ;
196+ }
184197 retryCount += 1 ;
185198 currentListener = new RetryingBlockTransferListener ();
186199
@@ -203,16 +216,17 @@ private synchronized boolean shouldRetry(Throwable e) {
203216 boolean isIOException = e instanceof IOException
204217 || e .getCause () instanceof IOException ;
205218 boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException ;
206- if (!isSaslTimeout && saslTimeoutSeen ) {
207- retryCount = 0 ;
208- saslTimeoutSeen = false ;
219+ // If this is a non SASL request failure, reduce earlier SASL failures from retryCount
220+ // since some subsequent SASL attempt was successful
221+ if (!isSaslTimeout && saslRetryCount > 0 ) {
222+ Preconditions .checkState (retryCount >= saslRetryCount ,
223+ "retryCount must be greater than or equal to saslRetryCount" );
224+ retryCount -= saslRetryCount ;
225+ saslRetryCount = 0 ;
209226 }
210227 boolean hasRemainingRetries = retryCount < maxRetries ;
211228 boolean shouldRetry = (isSaslTimeout || isIOException ) &&
212229 hasRemainingRetries && errorHandler .shouldRetryError (e );
213- if (shouldRetry && isSaslTimeout ) {
214- this .saslTimeoutSeen = true ;
215- }
216230 return shouldRetry ;
217231 }
218232
@@ -236,9 +250,13 @@ private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) {
236250 if (this == currentListener && outstandingBlocksIds .contains (blockId )) {
237251 outstandingBlocksIds .remove (blockId );
238252 shouldForwardSuccess = true ;
239- if (saslTimeoutSeen ) {
240- retryCount = 0 ;
241- saslTimeoutSeen = false ;
253+ // If there were SASL failures earlier, remove them from retryCount, as there was
254+ // a SASL success (and some other request post bootstrap was also successful).
255+ if (saslRetryCount > 0 ) {
256+ Preconditions .checkState (retryCount >= saslRetryCount ,
257+ "retryCount must be greater than or equal to saslRetryCount" );
258+ retryCount -= saslRetryCount ;
259+ saslRetryCount = 0 ;
242260 }
243261 }
244262 }
@@ -256,7 +274,7 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
256274 synchronized (RetryingBlockTransferor .this ) {
257275 if (this == currentListener && outstandingBlocksIds .contains (blockId )) {
258276 if (shouldRetry (exception )) {
259- initiateRetry ();
277+ initiateRetry (exception );
260278 } else {
261279 if (errorHandler .shouldLogError (exception )) {
262280 logger .error (
0 commit comments