From b9c30751c6ccabe27b158fbfbe84ee8758f0c662 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 31 Jan 2020 22:00:22 +0800 Subject: [PATCH 1/5] Update ExternalBlockStoreClient.java --- .../shuffle/ExternalBlockStoreClient.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d6185f089d3c..6f219db16db7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -103,14 +103,22 @@ public void fetchBlocks( try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (blockIds1, listener1) -> { - // Unless this client is closed. - if (clientFactory != null) { - TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockFetcher(client, appId, execId, - blockIds1, listener1, conf, downloadFileManager).start(); - } else { - logger.info("This clientFactory was closed. Skipping further block fetch retries."); - } + try{ + // Unless this client is closed. + if (clientFactory != null) { + TransportClient client = clientFactory.createClient(host, port); + new OneForOneBlockFetcher(client, appId, execId, + blockIds1, listener1, conf, downloadFileManager).start(); + } else { + logger.info("This clientFactory was closed. Skipping further block fetch retries."); + } + } catch (IOException e) { + logger.info("The relative remote external shuffle service (host: " + host + "," + + "port: "+ port + "), which maintains the block data can't been connected."); + throw e; + } catch (Exception e) { + throw e; + } }; int maxRetries = conf.maxIORetries(); From 32d2be877b7f70b900b176038a1108488e943364 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 3 Feb 2020 23:15:14 +0800 Subject: [PATCH 2/5] follow comment --- .../shuffle/ExternalBlockStoreClient.java | 36 ++++++++++--------- .../ExternalShuffleServiceLostException.java | 29 +++++++++++++++ .../network/shuffle/RetryingBlockFetcher.java | 2 +- 3 files changed, 50 insertions(+), 17 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 6f219db16db7..41dc1d374050 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -103,22 +103,26 @@ public void fetchBlocks( try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (blockIds1, listener1) -> { - try{ - // Unless this client is closed. - if (clientFactory != null) { - TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockFetcher(client, appId, execId, - blockIds1, listener1, conf, downloadFileManager).start(); - } else { - logger.info("This clientFactory was closed. Skipping further block fetch retries."); - } - } catch (IOException e) { - logger.info("The relative remote external shuffle service (host: " + host + "," + - "port: "+ port + "), which maintains the block data can't been connected."); - throw e; - } catch (Exception e) { - throw e; - } + + // Unless this client is closed. + if (clientFactory != null) { + TransportClient client = null; + try { + client = clientFactory.createClient(host, port); + } catch (Exception e) { + // throw ExternalShuffleServiceLostException exception then we won't retry to connect + // un-connected External Shuffle Service. + String msg = "The relative remote external shuffle service (host: " + host + "," + + "port: " + port + "), which maintains the block data can't been connected."; + logger.info(msg); + throw new ExternalShuffleServiceLostException(msg); + } + new OneForOneBlockFetcher(client, appId, execId, + blockIds1, listener1, conf, downloadFileManager).start(); + } else { + logger.info("This clientFactory was closed. Skipping further block fetch retries."); + } + }; int maxRetries = conf.maxIORetries(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java new file mode 100644 index 000000000000..46e3709e9cdc --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleServiceLostException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +/** + * Exception throw when we can't connect to external shuffle service to sto unnecessary + * retry in ExternalShuffleStoreClient.fetchBlocks() + */ +public class ExternalShuffleServiceLostException extends Exception { + + ExternalShuffleServiceLostException(String message) { + super(message); + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 6bf3da94030d..9b1982d9969d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -58,7 +58,7 @@ public interface BlockFetchStarter { * issues. */ void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException, InterruptedException; + throws IOException, InterruptedException, ExternalShuffleServiceLostException; } /** Shared executor service used for waiting and retrying. */ From 40dd0765e14cf59ae483d5bd2fd05f391757aef0 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 3 Feb 2020 23:23:25 +0800 Subject: [PATCH 3/5] Update RetryingBlockFetcherSuite.java --- .../spark/network/shuffle/RetryingBlockFetcherSuite.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 6f90df5f611a..905821d14ed5 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -291,7 +291,11 @@ private static void performInteractions(List> inte } assertNotNull(stub); - stub.when(fetchStarter).createAndStart(any(), any()); + try { + stub.when(fetchStarter).createAndStart(any(), any()); + } catch (ExternalShuffleServiceLostException e) { + throw new IOException(e); + } String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); } From c9061c5651e34f4134ab9a8c34f0d124f641f06c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 4 Feb 2020 22:55:30 +0800 Subject: [PATCH 4/5] Update ExternalBlockStoreClient.java --- .../spark/network/shuffle/ExternalBlockStoreClient.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 41dc1d374050..51b6b29ac0a7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -103,7 +103,6 @@ public void fetchBlocks( try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (blockIds1, listener1) -> { - // Unless this client is closed. if (clientFactory != null) { TransportClient client = null; @@ -113,16 +112,15 @@ public void fetchBlocks( // throw ExternalShuffleServiceLostException exception then we won't retry to connect // un-connected External Shuffle Service. String msg = "The relative remote external shuffle service (host: " + host + "," + - "port: " + port + "), which maintains the block data can't been connected."; + "port: " + port + "), which maintains the block data can't been connected."; logger.info(msg); throw new ExternalShuffleServiceLostException(msg); } new OneForOneBlockFetcher(client, appId, execId, - blockIds1, listener1, conf, downloadFileManager).start(); + blockIds1, listener1, conf, downloadFileManager).start(); } else { logger.info("This clientFactory was closed. Skipping further block fetch retries."); } - }; int maxRetries = conf.maxIORetries(); From 2c4995e985baca97838523954395dc68b38b0c3c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 12 Feb 2020 16:24:59 +0800 Subject: [PATCH 5/5] check host rechable and port connectable --- .../shuffle/ExternalBlockStoreClient.java | 52 ++++++++++++++++--- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 51b6b29ac0a7..8fd8dd91cc4a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -18,6 +18,10 @@ package org.apache.spark.network.shuffle; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -108,13 +112,19 @@ public void fetchBlocks( TransportClient client = null; try { client = clientFactory.createClient(host, port); - } catch (Exception e) { - // throw ExternalShuffleServiceLostException exception then we won't retry to connect - // un-connected External Shuffle Service. - String msg = "The relative remote external shuffle service (host: " + host + "," + - "port: " + port + "), which maintains the block data can't been connected."; - logger.info(msg); - throw new ExternalShuffleServiceLostException(msg); + } catch (InterruptedException e) { + throw e; + } catch (IOException e) { + if (!isHostReachable(host, 3000) || !isHostPortConnectable(host, port)) { + // throw ExternalShuffleServiceLostException exception then we won't retry to connect + // un-connected External Shuffle Service. + String msg = "The relative remote external shuffle service (host: " + host + "," + + "port: " + port + "), which maintains the block data can't been connected."; + logger.info(msg); + throw new ExternalShuffleServiceLostException(msg); + } else { + throw e; + } } new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf, downloadFileManager).start(); @@ -139,6 +149,34 @@ public void fetchBlocks( } } + public static boolean isHostPortConnectable(String host, int port) { + Socket socket = new Socket(); + try { + socket.connect(new InetSocketAddress(host, port)); + } catch (IOException e) { + e.printStackTrace(); + return false; + } finally { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + return true; + } + + public static boolean isHostReachable(String host, Integer timeOut) { + try { + return InetAddress.getByName(host).isReachable(timeOut); + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return false; + } + @Override public MetricSet shuffleMetrics() { checkInit();