From 59ca62d26a96623d55d36b2f978e0a2f53c53c69 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Sat, 28 Dec 2024 21:41:41 +0800 Subject: [PATCH 1/4] HDFS-17699. [ARR]Avoid adding calls indefinitely make Router Out-of-Memory. --- .../java/org/apache/hadoop/ipc/Client.java | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index f052aa2d8a260..774b6c9a43e13 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -204,6 +204,8 @@ public static Object getExternalHandler() { private final byte[] clientId; private final int maxAsyncCalls; private final AtomicInteger asyncCallCounter = new AtomicInteger(0); + private final ConcurrentMap asyncCallCounters = + new ConcurrentHashMap<>(); /** * set the ping interval value in configuration @@ -1246,6 +1248,7 @@ private void receiveRpcResponse() { if (status == RpcStatusProto.SUCCESS) { Writable value = packet.newInstance(valueClass, conf); final Call call = calls.remove(callId); + releaseAsyncCallPermit(); if (call.alignmentContext != null) { call.alignmentContext.receiveResponseState(header); } @@ -1269,6 +1272,7 @@ private void receiveRpcResponse() { RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode); if (status == RpcStatusProto.ERROR) { final Call call = calls.remove(callId); + releaseAsyncCallPermit(); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection @@ -1344,6 +1348,13 @@ private void cleanupCalls() { c.setException(closeException); // local exception } } + + private void releaseAsyncCallPermit() { + Semaphore asyncCallPermits = asyncCallCounters.get(remoteId); + if (asyncCallPermits != null) { + asyncCallPermits.release(1); + } + } } /** @@ -1459,16 +1470,28 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, fallbackToSimpleAuth, alignmentContext); } - private void checkAsyncCall() throws IOException { + private void checkAsyncCall(ConnectionId remoteId) throws IOException { if (isAsynchronousMode()) { - if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) { - asyncCallCounter.decrementAndGet(); - String errMsg = String.format( - "Exceeded limit of max asynchronous calls: %d, " + - "please configure %s to adjust it.", - maxAsyncCalls, - CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); - throw new AsyncCallLimitExceededException(errMsg); + Semaphore asyncPermits = asyncCallCounters.computeIfAbsent(remoteId, + id -> new Semaphore(maxAsyncCalls)); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Acquiring lock for connectionId {}", remoteId); + } + // TODO timeout param configurable. + boolean isAcquired = asyncPermits.tryAcquire(1000, TimeUnit.MILLISECONDS); + if (!isAcquired) { + String errMsg = String.format( + "Exceeded limit of max asynchronous calls: %d, " + + "please configure %s to adjust it.", + maxAsyncCalls, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); + throw new AsyncCallLimitExceededException(errMsg); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot acquire a permit for connectionId {}", remoteId); + } } } } @@ -1502,11 +1525,12 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, throws IOException { final Call call = createCall(rpcKind, rpcRequest); call.setAlignmentContext(alignmentContext); - final Connection connection = getConnection(remoteId, call, serviceClass, - fallbackToSimpleAuth); + final Connection connection; try { - checkAsyncCall(); + checkAsyncCall(remoteId); + connection = getConnection(remoteId, call, serviceClass, + fallbackToSimpleAuth); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { @@ -1518,7 +1542,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ioe.initCause(ie); throw ioe; } - } catch(Exception e) { + } catch (Exception e) { if (isAsynchronousMode()) { releaseAsyncCall(); } From 9dcb8b28496e0bf2ab78080deb5d9d31f03a189a Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Sun, 29 Dec 2024 13:48:14 +0800 Subject: [PATCH 2/4] fix unit test. --- .../src/main/java/org/apache/hadoop/ipc/Client.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 774b6c9a43e13..8a74b1de2bc1b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1472,6 +1472,7 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, private void checkAsyncCall(ConnectionId remoteId) throws IOException { if (isAsynchronousMode()) { + asyncCallCounter.incrementAndGet(); Semaphore asyncPermits = asyncCallCounters.computeIfAbsent(remoteId, id -> new Semaphore(maxAsyncCalls)); try { From a7c53bc3220a21c2938eb848b572473c56fddcc7 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Sun, 29 Dec 2024 14:02:20 +0800 Subject: [PATCH 3/4] make asyncCalllPermitsTimeoutMs configurable --- .../org/apache/hadoop/fs/CommonConfigurationKeys.java | 3 +++ .../src/main/java/org/apache/hadoop/ipc/Client.java | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 31b6654afc578..0368f6bffc9be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -355,6 +355,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY = "ipc.client.async.calls.max"; public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100; + public static final String IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_KEY = + "ipc.client.async.calls.permits.acquire.timeout.ms"; + public static final int IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_DEFAULT = 1000; public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed"; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 8a74b1de2bc1b..bcc0e49463303 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -204,6 +204,7 @@ public static Object getExternalHandler() { private final byte[] clientId; private final int maxAsyncCalls; private final AtomicInteger asyncCallCounter = new AtomicInteger(0); + private final int asyncCalllPermitsTimeoutMs; private final ConcurrentMap asyncCallCounters = new ConcurrentHashMap<>(); @@ -1382,6 +1383,9 @@ public Client(Class valueClass, Configuration conf, this.maxAsyncCalls = conf.getInt( CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); + this.asyncCalllPermitsTimeoutMs = conf.getInt( + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_KEY, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_DEFAULT); } /** @@ -1479,8 +1483,8 @@ private void checkAsyncCall(ConnectionId remoteId) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Acquiring lock for connectionId {}", remoteId); } - // TODO timeout param configurable. - boolean isAcquired = asyncPermits.tryAcquire(1000, TimeUnit.MILLISECONDS); + boolean isAcquired = asyncPermits.tryAcquire(asyncCalllPermitsTimeoutMs, + TimeUnit.MILLISECONDS); if (!isAcquired) { String errMsg = String.format( "Exceeded limit of max asynchronous calls: %d, " + From f5b5cbdc5c3dbf2a85136e9187359d2130ee121e Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 13 Jan 2025 21:08:26 +0800 Subject: [PATCH 4/4] make asyncCallPermits as a field of Connection. --- .../java/org/apache/hadoop/ipc/Client.java | 62 +++++++++---------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index bcc0e49463303..077e0835d9bd2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -205,8 +205,6 @@ public static Object getExternalHandler() { private final int maxAsyncCalls; private final AtomicInteger asyncCallCounter = new AtomicInteger(0); private final int asyncCalllPermitsTimeoutMs; - private final ConcurrentMap asyncCallCounters = - new ConcurrentHashMap<>(); /** * set the ping interval value in configuration @@ -421,6 +419,7 @@ private class Connection extends Thread { // currently active calls private Hashtable calls = new Hashtable(); + private Semaphore asyncCallPermits = new Semaphore(maxAsyncCalls); private AtomicLong lastActivity = new AtomicLong();// last I/O activity time private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason @@ -507,9 +506,10 @@ private void touch() { * @param call to add * @return true if the call was added. */ - private synchronized boolean addCall(Call call) { + private synchronized boolean addCall(Call call) throws IOException { if (shouldCloseConnection.get()) return false; + checkAsyncCall(); calls.put(call.id, call); notify(); return true; @@ -1351,11 +1351,36 @@ private void cleanupCalls() { } private void releaseAsyncCallPermit() { - Semaphore asyncCallPermits = asyncCallCounters.get(remoteId); if (asyncCallPermits != null) { asyncCallPermits.release(1); } } + + private void checkAsyncCall() throws IOException { + if (isAsynchronousMode()) { + asyncCallCounter.incrementAndGet(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Acquiring async call permit for connectionId {}", this.remoteId); + } + boolean isAcquired = asyncCallPermits.tryAcquire(asyncCalllPermitsTimeoutMs, + TimeUnit.MILLISECONDS); + if (!isAcquired) { + String errMsg = String.format( + "Exceeded limit of max asynchronous calls: %d, " + + "please configure %s to adjust it.", + maxAsyncCalls, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); + throw new AsyncCallLimitExceededException(errMsg); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted when acquiring async call permit for connectionId {}", remoteId); + } + throw new IOException(e); + } + } + } } /** @@ -1474,33 +1499,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, fallbackToSimpleAuth, alignmentContext); } - private void checkAsyncCall(ConnectionId remoteId) throws IOException { - if (isAsynchronousMode()) { - asyncCallCounter.incrementAndGet(); - Semaphore asyncPermits = asyncCallCounters.computeIfAbsent(remoteId, - id -> new Semaphore(maxAsyncCalls)); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Acquiring lock for connectionId {}", remoteId); - } - boolean isAcquired = asyncPermits.tryAcquire(asyncCalllPermitsTimeoutMs, - TimeUnit.MILLISECONDS); - if (!isAcquired) { - String errMsg = String.format( - "Exceeded limit of max asynchronous calls: %d, " + - "please configure %s to adjust it.", - maxAsyncCalls, - CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); - throw new AsyncCallLimitExceededException(errMsg); - } - } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot acquire a permit for connectionId {}", remoteId); - } - } - } - } - Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) @@ -1531,9 +1529,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, final Call call = createCall(rpcKind, rpcRequest); call.setAlignmentContext(alignmentContext); final Connection connection; - try { - checkAsyncCall(remoteId); connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try {