Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1556,15 +1556,27 @@ public void close() {
}

/**
* Return true if there's at least one connection establishment is currently underway
* Checks current connection states (connected and connecting) against the least loaded node algorithm to
* determine if a new connection should be established.
*
* @return true if a new connection should be established.
*/
private boolean isAnyNodeConnecting() {
private boolean shouldCreateNewConnection() {
// When the least loaded node algorithm is AT_LEAST_THREE, we require at least three connections to be established.
int requiredCount = leastLoadedNodeAlgorithm == LeastLoadedNodeAlgorithm.AT_LEAST_THREE ? 3 : 1;
boolean hasConnectingNode = false;
for (Node node : fetchNodes()) {
if (connectionStates.isConnecting(node.idString())) {
return true;
--requiredCount;
hasConnectingNode = true;
} else if (connectionStates.isConnected(node.idString())) {
--requiredCount;
}
if (hasConnectingNode && requiredCount <= 0) {
return false;
}
}
return false;
return true;
}

/**
Expand All @@ -1583,10 +1595,10 @@ private long maybeUpdate(long now, Node node) {
return defaultRequestTimeoutMs;
}

// If there's any connection establishment underway, wait until it completes. This prevents
// the client from unnecessarily connecting to additional nodes while a previous connection
// attempt has not been completed.
if (isAnyNodeConnecting()) {
// If there are already established and establishing connections satisfies the least loaded node algorithm,
// wait until they complete. This prevents the client from unnecessarily connecting to additional nodes
// before previous connection attempts to complete.
if (!shouldCreateNewConnection()) {
// Strictly the timeout we should return here is "connect timeout", but as we don't
// have such application level configuration, using reconnect backoff instead.
return reconnectBackoffMs;
Expand Down