From 4b610c87d65a39a33939621e7299d7a5881515c0 Mon Sep 17 00:00:00 2001 From: Steve Vaughan Jr Date: Mon, 25 Jul 2022 09:49:39 -0400 Subject: [PATCH 1/6] HADOOP-18365. Update the remote address when a change is detected Avoid reconnecting to the old address after detecting that the address has been updated. --- .../src/main/java/org/apache/hadoop/ipc/Client.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 534824e204fa4..b8fc04e6e0160 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -645,6 +645,8 @@ private synchronized boolean updateAddress() throws IOException { LOG.warn("Address change detected. Old: " + server.toString() + " New: " + currentAddr.toString()); server = currentAddr; + // Update the remote address so that reconnections are with the updated address. This avoids thrashing. + remoteId.setAddress(currentAddr); UserGroupInformation ticket = remoteId.getTicket(); this.setName("IPC Client (" + socketFactory.hashCode() + ") connection to " + server.toString() + " from " @@ -1753,7 +1755,16 @@ public ConnectionId(InetSocketAddress address, Class protocol, InetSocketAddress getAddress() { return address; } - + + /** + * Used to update the remote address when an address change is detected. + * + * @param address the new address + */ + private void setAddress(InetSocketAddress address) { + this.address = address; + } + Class getProtocol() { return protocol; } From 590f5ba0dd97af98bbca841ba6f9bc604e6e7489 Mon Sep 17 00:00:00 2001 From: Steve Vaughan Jr Date: Thu, 4 Aug 2022 12:13:03 -0400 Subject: [PATCH 2/6] Fix Checkstyle line length violation --- .../src/main/java/org/apache/hadoop/ipc/Client.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index b8fc04e6e0160..e798bd8023746 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -645,7 +645,8 @@ private synchronized boolean updateAddress() throws IOException { LOG.warn("Address change detected. Old: " + server.toString() + " New: " + currentAddr.toString()); server = currentAddr; - // Update the remote address so that reconnections are with the updated address. This avoids thrashing. + // Update the remote address so that reconnections are with the updated address. This + // avoids thrashing. remoteId.setAddress(currentAddr); UserGroupInformation ticket = remoteId.getTicket(); this.setName("IPC Client (" + socketFactory.hashCode() From 168be14ed8a736fcee3c42d14e11046c96a97133 Mon Sep 17 00:00:00 2001 From: Steve Vaughan Jr Date: Sat, 6 Aug 2022 10:31:23 -0400 Subject: [PATCH 3/6] Keep ConnectionId as Immutable for map key The ConnectionId is used as a key in the connections map, and updating the remoteId caused problems with the cleanup of connections when the removeMethod was used. Instead of updating the address within the remoteId, use the removeMethod to cleanup references to the current identifier and then replace it with a new identifier using the updated address. --- .../java/org/apache/hadoop/ipc/Client.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index e798bd8023746..30003e482707f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -419,7 +419,8 @@ public synchronized Writable getRpcResponse() { * socket: responses may be delivered out of order. */ private class Connection extends Thread { private InetSocketAddress server; // server ip:port - private final ConnectionId remoteId; // connection id + // This remoteId needs to be mutable in order to handle updated addresses + private ConnectionId remoteId; // connection id private AuthMethod authMethod; // authentication method private AuthProtocol authProtocol; private int serviceClass; @@ -645,9 +646,11 @@ private synchronized boolean updateAddress() throws IOException { LOG.warn("Address change detected. Old: " + server.toString() + " New: " + currentAddr.toString()); server = currentAddr; - // Update the remote address so that reconnections are with the updated address. This - // avoids thrashing. - remoteId.setAddress(currentAddr); + // Update the remote address so that reconnections are with the updated address. + // This avoids thrashing. We remove the old connection and then replace the remoteId + // because it is an immutable class that is used as a key in the connections map. + removeMethod.accept(this); + remoteId = ConnectionId.updateAddress(remoteId, currentAddr); UserGroupInformation ticket = remoteId.getTicket(); this.setName("IPC Client (" + socketFactory.hashCode() + ") connection to " + server.toString() + " from " @@ -1720,7 +1723,21 @@ public static class ConnectionId { private final int pingInterval; // how often sends ping to the server in msecs private String saslQop; // here for testing private final Configuration conf; // used to get the expected kerberos principal name - + + /** + * Creates a new identifier with an updated address, maintaining all other settings. This is + * used to update the remote address when an address change is detected. + * + * @param original the identifier that will be replaced + * @param address the updated address + * @return a replacement identifier + * @see Connection#updateAddress() + */ + private static ConnectionId updateAddress(ConnectionId original, InetSocketAddress address) { + return new ConnectionId(address, original.protocol, original.ticket, original.rpcTimeout, + original.connectionRetryPolicy, original.conf); + } + public ConnectionId(InetSocketAddress address, Class protocol, UserGroupInformation ticket, int rpcTimeout, RetryPolicy connectionRetryPolicy, Configuration conf) { @@ -1757,15 +1774,6 @@ InetSocketAddress getAddress() { return address; } - /** - * Used to update the remote address when an address change is detected. - * - * @param address the new address - */ - private void setAddress(InetSocketAddress address) { - this.address = address; - } - Class getProtocol() { return protocol; } From 1f1fdcf220556acdc652d8a00693c7d314e1dd8f Mon Sep 17 00:00:00 2001 From: Steve Vaughan Jr Date: Sat, 6 Aug 2022 10:32:50 -0400 Subject: [PATCH 4/6] Use final to protect immutable ConnectionId Mark non-test fields as private and final, and add a missing accessor. --- .../src/main/java/org/apache/hadoop/ipc/Client.java | 6 +++--- .../main/java/org/apache/hadoop/ipc/WritableRpcEngine.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 30003e482707f..73a7d60e53b35 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1706,9 +1706,9 @@ private Connection getConnection(ConnectionId remoteId, @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Evolving public static class ConnectionId { - InetSocketAddress address; - UserGroupInformation ticket; - final Class protocol; + private final InetSocketAddress address; + private final UserGroupInformation ticket; + private final Class protocol; private static final int PRIME = 16777619; private final int rpcTimeout; private final int maxIdleTime; //connections will be culled if it was idle for diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 2a19ad29a6db2..3e4ee707d46ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -323,7 +323,7 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, Client.ConnectionId connId, Configuration conf, SocketFactory factory) throws IOException { return getProxy(protocol, clientVersion, connId.getAddress(), - connId.ticket, conf, factory, connId.getRpcTimeout(), + connId.getTicket(), conf, factory, connId.getRpcTimeout(), connId.getRetryPolicy(), null, null); } From cac283768bdcd4e9921ac6dc9ea9491f8157cfd3 Mon Sep 17 00:00:00 2001 From: Steve Vaughan Jr Date: Mon, 8 Aug 2022 10:01:55 -0400 Subject: [PATCH 5/6] Use a stable hashCode to allow safe IP addr changes --- .../java/org/apache/hadoop/ipc/Client.java | 52 ++++++++------ .../java/org/apache/hadoop/ipc/TestIPC.java | 67 +++++++++++++++++++ 2 files changed, 97 insertions(+), 22 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 73a7d60e53b35..20fc9efe57e0a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -419,8 +419,7 @@ public synchronized Writable getRpcResponse() { * socket: responses may be delivered out of order. */ private class Connection extends Thread { private InetSocketAddress server; // server ip:port - // This remoteId needs to be mutable in order to handle updated addresses - private ConnectionId remoteId; // connection id + private final ConnectionId remoteId; // connection id private AuthMethod authMethod; // authentication method private AuthProtocol authProtocol; private int serviceClass; @@ -647,10 +646,8 @@ private synchronized boolean updateAddress() throws IOException { " New: " + currentAddr.toString()); server = currentAddr; // Update the remote address so that reconnections are with the updated address. - // This avoids thrashing. We remove the old connection and then replace the remoteId - // because it is an immutable class that is used as a key in the connections map. - removeMethod.accept(this); - remoteId = ConnectionId.updateAddress(remoteId, currentAddr); + // This avoids thrashing. + remoteId.setAddress(currentAddr); UserGroupInformation ticket = remoteId.getTicket(); this.setName("IPC Client (" + socketFactory.hashCode() + ") connection to " + server.toString() + " from " @@ -1706,7 +1703,7 @@ private Connection getConnection(ConnectionId remoteId, @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Evolving public static class ConnectionId { - private final InetSocketAddress address; + private InetSocketAddress address; private final UserGroupInformation ticket; private final Class protocol; private static final int PRIME = 16777619; @@ -1724,20 +1721,6 @@ public static class ConnectionId { private String saslQop; // here for testing private final Configuration conf; // used to get the expected kerberos principal name - /** - * Creates a new identifier with an updated address, maintaining all other settings. This is - * used to update the remote address when an address change is detected. - * - * @param original the identifier that will be replaced - * @param address the updated address - * @return a replacement identifier - * @see Connection#updateAddress() - */ - private static ConnectionId updateAddress(ConnectionId original, InetSocketAddress address) { - return new ConnectionId(address, original.protocol, original.ticket, original.rpcTimeout, - original.connectionRetryPolicy, original.conf); - } - public ConnectionId(InetSocketAddress address, Class protocol, UserGroupInformation ticket, int rpcTimeout, RetryPolicy connectionRetryPolicy, Configuration conf) { @@ -1774,6 +1757,27 @@ InetSocketAddress getAddress() { return address; } + /** + * This is used to update the remote address when an address change is detected. This method + * ensures that the {@link #hashCode()} won't change. + * + * @param address the updated address + * @throws IllegalArgumentException if the hostname or port doesn't match + * @see Connection#updateAddress() + */ + void setAddress(InetSocketAddress address) { + if (!Objects.equals(this.address.getHostName(), address.getHostName())) { + throw new IllegalArgumentException("Hostname must match: " + this.address + " vs " + + address); + } + if (this.address.getPort() != address.getPort()) { + throw new IllegalArgumentException("Port must match: " + this.address + " vs " + address); + } + + this.address = address; + } + + Class getProtocol() { return protocol; } @@ -1884,7 +1888,11 @@ && isEqual(this.protocol, that.protocol) @Override public int hashCode() { int result = connectionRetryPolicy.hashCode(); - result = PRIME * result + ((address == null) ? 0 : address.hashCode()); + // We calculate based on the host name and port without the IP address, since the hashCode + // must be stable even if the IP address is updated. + result = PRIME * result + ((address == null || address.getHostName() == null) ? 0 : + address.getHostName().hashCode()); + result = PRIME * result + ((address == null) ? 0 : address.getPort()); result = PRIME * result + (doPing ? 1231 : 1237); result = PRIME * result + maxIdleTime; result = PRIME * result + pingInterval; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 95ff302103d89..1685e0bdc2e6f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -93,6 +94,7 @@ import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.Whitebox; import org.apache.hadoop.util.StringUtils; +import org.assertj.core.api.Condition; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -815,6 +817,71 @@ public Void call() throws IOException { } } + /** + * The {@link ConnectionId#hashCode} has to be stable despite updates that occur as the the + * address evolves over time. The {@link ConnectionId} is used as a primary key in maps, so + * its hashCode can't change. + * + * @throws IOException if there is a client or server failure + */ + @Test + public void testStableHashCode() throws IOException { + Server server = new TestServer(5, false); + try { + server.start(); + + // Leave host unresolved to start. Use "localhost" as opposed + // to local IP from NetUtils.getConnectAddress(server) to force + // resolution later + InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved( + "localhost", NetUtils.getConnectAddress(server).getPort()); + + // Setup: Create a ConnectionID using an unresolved address, and get it's hashCode to serve + // as a point of comparison. + int rpcTimeout = MIN_SLEEP_TIME * 2; + final ConnectionId remoteId = getConnectionId(unresolvedAddr, rpcTimeout, conf); + int expected = remoteId.hashCode(); + + // Start client + Client.setConnectTimeout(conf, 100); + Client client = new Client(LongWritable.class, conf); + try { + // Test: Call should re-resolve host and succeed + LongWritable param = new LongWritable(RANDOM.nextLong()); + client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, + RPC.RPC_SERVICE_CLASS_DEFAULT, null); + int actual = remoteId.hashCode(); + + // Verify: The hashCode should match, although the InetAddress is different since it has + // now been resolved + assertThat(remoteId.getAddress()).isNotEqualTo(unresolvedAddr); + assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName()); + assertThat(remoteId.hashCode()).isEqualTo(expected); + + // Verify: The hashCode is protected against updates to the host name + String hostName = InetAddress.getLocalHost().getHostName(); + InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostName(), + remoteId.getAddress().getPort()); + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> remoteId.setAddress(mismatchedHostName)) + .withMessageStartingWith("Hostname must match"); + + // Verify: The hashCode is protected against updates to the port + InetSocketAddress mismatchedPort = NetUtils.createSocketAddr( + remoteId.getAddress().getHostName(), + remoteId.getAddress().getPort() + 1); + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> remoteId.setAddress(mismatchedPort)) + .withMessageStartingWith("Port must match"); + } finally { + client.stop(); + } + } finally { + server.stop(); + } + } + @Test(timeout=60000) public void testIpcFlakyHostResolution() throws IOException { // start server From 03cf96b24012c8e7f34c76be6ec1c0a2b0b5f98e Mon Sep 17 00:00:00 2001 From: Steve Vaughan Jr Date: Thu, 11 Aug 2022 10:48:23 -0400 Subject: [PATCH 6/6] Add test that updated address is used Once the address has been updated, it should be used in future calls. Check to ensure that a second request succeeds and that it uses the existing updated address instead of having to re-resolve. --- .../src/test/java/org/apache/hadoop/ipc/TestIPC.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 1685e0bdc2e6f..1e780793a6d24 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -858,6 +858,16 @@ public void testStableHashCode() throws IOException { assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName()); assertThat(remoteId.hashCode()).isEqualTo(expected); + // Test: Call should succeed without having to re-resolve + InetSocketAddress expectedSocketAddress = remoteId.getAddress(); + param = new LongWritable(RANDOM.nextLong()); + client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, + RPC.RPC_SERVICE_CLASS_DEFAULT, null); + + // Verify: The same instance of the InetSocketAddress has been used to make the second + // call + assertThat(remoteId.getAddress()).isSameAs(expectedSocketAddress); + // Verify: The hashCode is protected against updates to the host name String hostName = InetAddress.getLocalHost().getHostName(); InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr(