diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d6185f089d3c..8fd8dd91cc4a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -18,6 +18,10 @@ package org.apache.spark.network.shuffle; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -105,7 +109,23 @@ public void fetchBlocks( (blockIds1, listener1) -> { // Unless this client is closed. if (clientFactory != null) { - TransportClient client = clientFactory.createClient(host, port); + TransportClient client = null; + try { + client = clientFactory.createClient(host, port); + } catch (InterruptedException e) { + throw e; + } catch (IOException e) { + if (!isHostReachable(host, 3000) || !isHostPortConnectable(host, port)) { + // throw ExternalShuffleServiceLostException exception then we won't retry to connect + // un-connected External Shuffle Service. + String msg = "The relative remote external shuffle service (host: " + host + "," + + "port: " + port + "), which maintains the block data can't been connected."; + logger.info(msg); + throw new ExternalShuffleServiceLostException(msg); + } else { + throw e; + } + } new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf, downloadFileManager).start(); } else { @@ -129,6 +149,34 @@ public void fetchBlocks( } } + public static boolean isHostPortConnectable(String host, int port) { + Socket socket = new Socket(); + try { + socket.connect(new InetSocketAddress(host, port)); + } catch (IOException e) { + e.printStackTrace(); + return false; + } finally { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + return true; + } + + public static boolean isHostReachable(String host, Integer timeOut) { + try { + return InetAddress.getByName(host).isReachable(timeOut); + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return false; + } + @Override public MetricSet shuffleMetrics() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java new file mode 100644 index 000000000000..46e3709e9cdc --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +/** + * Exception throw when we can't connect to external shuffle service to sto unnecessary + * retry in ExternalShuffleStoreClient.fetchBlocks() + */ +public class ExternalShuffleServiceLostException extends Exception { + + ExternalShuffleServiceLostException(String message) { + super(message); + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 6bf3da94030d..9b1982d9969d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -58,7 +58,7 @@ public interface BlockFetchStarter { * issues. */ void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException, InterruptedException; + throws IOException, InterruptedException, ExternalShuffleServiceLostException; } /** Shared executor service used for waiting and retrying. */ diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 6f90df5f611a..905821d14ed5 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -291,7 +291,11 @@ private static void performInteractions(List> inte } assertNotNull(stub); - stub.when(fetchStarter).createAndStart(any(), any()); + try { + stub.when(fetchStarter).createAndStart(any(), any()); + } catch (ExternalShuffleServiceLostException e) { + throw new IOException(e); + } String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); }