diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index db3c283fbe643..1743003d4917e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -1556,15 +1556,27 @@ public void close() { } /** - * Return true if there's at least one connection establishment is currently underway + * Checks current connection states (connected and connecting) against the least loaded node algorithm to + * determine if a new connection should be established. + * + * @return true if a new connection should be established. */ - private boolean isAnyNodeConnecting() { + private boolean shouldCreateNewConnection() { + // When the least loaded node algorithm is AT_LEAST_THREE, we require at least three connections to be established. + int requiredCount = leastLoadedNodeAlgorithm == LeastLoadedNodeAlgorithm.AT_LEAST_THREE ? 3 : 1; + boolean hasConnectingNode = false; for (Node node : fetchNodes()) { if (connectionStates.isConnecting(node.idString())) { - return true; + --requiredCount; + hasConnectingNode = true; + } else if (connectionStates.isConnected(node.idString())) { + --requiredCount; + } + if (hasConnectingNode && requiredCount <= 0) { + return false; } } - return false; + return true; } /** @@ -1583,10 +1595,10 @@ private long maybeUpdate(long now, Node node) { return defaultRequestTimeoutMs; } - // If there's any connection establishment underway, wait until it completes. This prevents - // the client from unnecessarily connecting to additional nodes while a previous connection - // attempt has not been completed. - if (isAnyNodeConnecting()) { + // If there are already established and establishing connections satisfies the least loaded node algorithm, + // wait until they complete. This prevents the client from unnecessarily connecting to additional nodes + // before previous connection attempts to complete. + if (!shouldCreateNewConnection()) { // Strictly the timeout we should return here is "connect timeout", but as we don't // have such application level configuration, using reconnect backoff instead. return reconnectBackoffMs;