Skip to content

Commit

Permalink
Merge pull request #771 from Vlatombe/websocket-connection-logic-rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlatombe authored Nov 5, 2024
2 parents 043ef22 + 5e79c6f commit 92c105e
Showing 1 changed file with 111 additions and 34 deletions.
145 changes: 111 additions & 34 deletions src/main/java/hudson/remoting/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ public Thread newThread(@NonNull final Runnable r) {

private Duration noReconnectAfter;

private Instant firstAttempt;

/**
* Determines whether the socket will have {@link Socket#setKeepAlive(boolean)} set or not.
*
Expand Down Expand Up @@ -593,7 +591,7 @@ public void run() {
}

@SuppressFBWarnings(
value = {"REC_CATCH_EXCEPTION", "URLCONNECTION_SSRF_FD"},
value = {"REC_CATCH_EXCEPTION"},
justification = "checked exceptions were a mistake to begin with; connecting to Jenkins from agent")
private void runWebSocket() {
try {
Expand Down Expand Up @@ -785,12 +783,20 @@ public void closeRead() throws IOException {
client.getProperties().put(ClientProperties.SSL_ENGINE_CONFIGURATOR, sslEngineConfigurator);
}
}
container.connectToServer(
new AgentEndpoint(),
ClientEndpointConfig.Builder.create()
.configurator(headerHandler)
.build(),
URI.create(wsUrl + "wsagents/"));
if (!succeedsWithRetries(this::pingSuccessful)) {
return;
}
if (!succeedsWithRetries(() -> {
container.connectToServer(
new AgentEndpoint(),
ClientEndpointConfig.Builder.create()
.configurator(headerHandler)
.build(),
URI.create(wsUrl + "wsagents/"));
return true;
})) {
return;
}
while (ch.get() == null) {
Thread.sleep(100);
}
Expand All @@ -801,38 +807,109 @@ public void closeRead() throws IOException {
if (noReconnect) {
return;
}
firstAttempt = Instant.now();
events.onDisconnect();
while (true) {
// TODO refactor various sleep statements into a common method
if (Util.shouldBailOut(firstAttempt, noReconnectAfter)) {
events.status("Bailing out after " + DurationFormatter.format(noReconnectAfter));
return;
}
TimeUnit.SECONDS.sleep(10);
// Unlike JnlpAgentEndpointResolver, we do not use $jenkins/tcpSlaveAgentListener/, as that will be
// a 404 if the TCP port is disabled.
URL ping = new URL(hudsonUrl, "login");
try {
HttpURLConnection conn = (HttpURLConnection) ping.openConnection();
int status = conn.getResponseCode();
conn.disconnect();
if (status == 200) {
break;
} else {
events.status(ping + " is not ready: " + status);
}
} catch (IOException x) {
events.status(ping + " is not ready", x);
}
}
reconnect();
}
} catch (Exception e) {
events.error(e);
}
}

/**
* Evaluates a condition with exponential backoff until it succeeds or the timeout is reached.
* @param condition the condition to attempt to succeed with exponential backoff
* @return true if the condition succeeded, false if the condition failed and the timeout was reached
* @throws InterruptedException if the thread was interrupted while waiting.
*/
private boolean succeedsWithRetries(java.util.concurrent.Callable<Boolean> condition) throws InterruptedException {
var exponentialRetry = new ExponentialRetry(noReconnectAfter);
while (exponentialRetry != null) {
try {
if (condition.call()) {
return true;
}
} catch (Exception x) {
events.status("Failed to connect: " + x.getMessage());
}
exponentialRetry = exponentialRetry.next(events);
}
return false;
}

@SuppressFBWarnings(
value = {"URLCONNECTION_SSRF_FD"},
justification = "url is provided by the user, and we are trying to connect to it")
private Boolean pingSuccessful() throws MalformedURLException {
// Unlike JnlpAgentEndpointResolver, we do not use $jenkins/tcpSlaveAgentListener/, as that will be
// a 404 if the TCP port is disabled.
URL ping = new URL(hudsonUrl, "login");
try {
HttpURLConnection conn = (HttpURLConnection) ping.openConnection();
int status = conn.getResponseCode();
conn.disconnect();
if (status == 200) {
return true;
} else {
events.status(ping + " is not ready: " + status);
}
} catch (IOException x) {
events.status(ping + " is not ready", x);
}
return false;
}

private static class ExponentialRetry {
final int factor;
final Instant beginning;
final Duration delay;
final Duration timeout;
final Duration incrementDelay;
final Duration maxDelay;

ExponentialRetry(Duration timeout) {
this(Duration.ofSeconds(0), timeout, 2, Duration.ofSeconds(1), Duration.ofSeconds(10));
}

ExponentialRetry(
Duration initialDelay, Duration timeout, int factor, Duration incrementDelay, Duration maxDelay) {
this.beginning = Instant.now();
this.delay = initialDelay;
this.timeout = timeout;
this.factor = factor;
this.incrementDelay = incrementDelay;
this.maxDelay = maxDelay;
}

ExponentialRetry(ExponentialRetry previous) {
beginning = previous.beginning;
factor = previous.factor;
timeout = previous.timeout;
incrementDelay = previous.incrementDelay;
maxDelay = previous.maxDelay;
delay = min(maxDelay, previous.delay.multipliedBy(previous.factor).plus(incrementDelay));
}

private static Duration min(Duration a, Duration b) {
return a.compareTo(b) < 0 ? a : b;
}

boolean timeoutExceeded() {
return Util.shouldBailOut(beginning, timeout);
}

ExponentialRetry next(EngineListenerSplitter events) throws InterruptedException {
var next = new ExponentialRetry(this);
if (next.timeoutExceeded()) {
events.status("Bailing out after " + DurationFormatter.format(next.timeout));
return null;
} else {
events.status("Waiting " + DurationFormatter.format(next.delay) + " before retry");
Thread.sleep(next.delay.toMillis());
}
return next;
}
}

private void reconnect() {
try {
events.status("Performing onReconnect operation.");
Expand Down Expand Up @@ -862,7 +939,7 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) {

try {
boolean first = true;
firstAttempt = Instant.now();
var firstAttempt = Instant.now();
while (true) {
if (first) {
first = false;
Expand Down

0 comments on commit 92c105e

Please sign in to comment.