diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 04a8a1aaa1db..ada016e271ff 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -146,8 +146,11 @@ public void connect(String encodedToken) throws Exception { connectToDatanode(dn, encodedToken); } - private void connectToDatanode(DatanodeDetails dn, String encodedToken) - throws IOException { + private synchronized void connectToDatanode(DatanodeDetails dn, + String encodedToken) throws IOException { + if (isConnected(dn)){ + return; + } // read port from the data node, on failure use default configured // port. int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); @@ -202,7 +205,7 @@ private boolean isConnected(ManagedChannel channel) { } @Override - public void close() { + public synchronized void close() { closed = true; for (ManagedChannel channel : channels.values()) { channel.shutdownNow(); @@ -368,19 +371,9 @@ public XceiverClientReply sendCommandAsync( private XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request, DatanodeDetails dn) - throws IOException, ExecutionException, InterruptedException { - if (closed) { - throw new IOException("This channel is not connected."); - } - + throws IOException, InterruptedException { + checkOpen(dn, request.getEncodedToken()); UUID dnId = dn.getUuid(); - ManagedChannel channel = channels.get(dnId); - // If the channel doesn't exist for this specific datanode or the channel - // is closed, just reconnect - String token = request.getEncodedToken(); - if (!isConnected(channel)) { - reconnect(dn, token); - } if (LOG.isDebugEnabled()) { LOG.debug("Send command {} to datanode {}", request.getCmdType().toString(), dn.getNetworkFullPath()); @@ -427,6 +420,21 @@ public void onCompleted() { return new XceiverClientReply(replyFuture); } + private synchronized void checkOpen(DatanodeDetails dn, String encodedToken) + throws IOException{ + if (closed) { + throw new IOException("This channel is not connected."); + } + + ManagedChannel channel = channels.get(dn.getUuid()); + // If the channel doesn't exist for this specific datanode or the channel + // is closed, just reconnect + if (!isConnected(channel)) { + reconnect(dn, encodedToken); + } + + } + private void reconnect(DatanodeDetails dn, String encodedToken) throws IOException { ManagedChannel channel; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index b15828a15309..761cc7369903 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -235,7 +235,6 @@ public XceiverClientSpi call() throws Exception { case RATIS: client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, caCert); - client.connect(); break; case STAND_ALONE: client = new XceiverClientGrpc(pipeline, conf, caCert); @@ -244,6 +243,7 @@ public XceiverClientSpi call() throws Exception { default: throw new IOException("not implemented" + pipeline.getType()); } + client.connect(); return client; } });