Skip to content

Commit e1740b8

Browse files
committed
REST client: hosts marked dead for the first time should not be immediately retried (#29230)
This was the plan from day one but due to a silly bug nodes were immediately retried after they were marked as dead for the first time. From the second time on, the expected backoff was applied.
1 parent 462068e commit e1740b8

File tree

5 files changed

+223
-58
lines changed

5 files changed

+223
-58
lines changed

client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,50 @@
2626
* when the host should be retried (based on number of previous failed attempts).
2727
* Class is immutable, a new copy of it should be created each time the state has to be changed.
2828
*/
29-
final class DeadHostState {
29+
final class DeadHostState implements Comparable<DeadHostState> {
3030

3131
private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
3232
private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
3333

34-
static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState();
35-
3634
private final int failedAttempts;
3735
private final long deadUntilNanos;
36+
private final TimeSupplier timeSupplier;
3837

39-
private DeadHostState() {
38+
/**
39+
* Build the initial dead state of a host. Useful when a working host stops functioning
40+
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
41+
*
42+
* @param timeSupplier a way to supply the current time and allow for unit testing
43+
*/
44+
DeadHostState(TimeSupplier timeSupplier) {
4045
this.failedAttempts = 1;
41-
this.deadUntilNanos = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
46+
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
47+
this.timeSupplier = timeSupplier;
4248
}
4349

4450
/**
45-
* We keep track of how many times a certain node fails consecutively. The higher that number is the longer we will wait
46-
* to retry that same node again. Minimum is 1 minute (for a node the only failed once), maximum is 30 minutes (for a node
47-
* that failed many consecutive times).
51+
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
52+
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
53+
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
54+
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
55+
*
56+
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
4857
*/
49-
DeadHostState(DeadHostState previousDeadHostState) {
58+
DeadHostState(DeadHostState previousDeadHostState, TimeSupplier timeSupplier) {
5059
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
5160
MAX_CONNECTION_TIMEOUT_NANOS);
52-
this.deadUntilNanos = System.nanoTime() + timeoutNanos;
61+
this.deadUntilNanos = timeSupplier.nanoTime() + timeoutNanos;
5362
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
63+
this.timeSupplier = timeSupplier;
64+
}
65+
66+
/**
67+
* Indicates whether it's time to retry to failed host or not.
68+
*
69+
* @return true if the host should be retried, false otherwise
70+
*/
71+
boolean shallBeRetried() {
72+
return timeSupplier.nanoTime() - deadUntilNanos > 0;
5473
}
5574

5675
/**
@@ -61,11 +80,35 @@ long getDeadUntilNanos() {
6180
return deadUntilNanos;
6281
}
6382

83+
int getFailedAttempts() {
84+
return failedAttempts;
85+
}
86+
87+
@Override
88+
public int compareTo(DeadHostState other) {
89+
return Long.compare(deadUntilNanos, other.deadUntilNanos);
90+
}
91+
6492
@Override
6593
public String toString() {
6694
return "DeadHostState{" +
6795
"failedAttempts=" + failedAttempts +
6896
", deadUntilNanos=" + deadUntilNanos +
6997
'}';
7098
}
99+
100+
/**
101+
* Time supplier that makes timing aspects pluggable to ease testing
102+
*/
103+
interface TimeSupplier {
104+
105+
TimeSupplier DEFAULT = new TimeSupplier() {
106+
@Override
107+
public long nanoTime() {
108+
return System.nanoTime();
109+
}
110+
};
111+
112+
long nanoTime();
113+
}
71114
}

client/rest/src/main/java/org/elasticsearch/client/RestClient.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
4848
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
4949

50+
import javax.net.ssl.SSLHandshakeException;
5051
import java.io.Closeable;
5152
import java.io.IOException;
5253
import java.net.SocketTimeoutException;
@@ -72,7 +73,6 @@
7273
import java.util.concurrent.TimeUnit;
7374
import java.util.concurrent.atomic.AtomicInteger;
7475
import java.util.concurrent.atomic.AtomicReference;
75-
import javax.net.ssl.SSLHandshakeException;
7676

7777
/**
7878
* Client that connects to an Elasticsearch cluster through HTTP.
@@ -457,18 +457,18 @@ private HostTuple<Iterator<HttpHost>> nextHost() {
457457
do {
458458
Set<HttpHost> filteredHosts = new HashSet<>(hostTuple.hosts);
459459
for (Map.Entry<HttpHost, DeadHostState> entry : blacklist.entrySet()) {
460-
if (System.nanoTime() - entry.getValue().getDeadUntilNanos() < 0) {
460+
if (entry.getValue().shallBeRetried() == false) {
461461
filteredHosts.remove(entry.getKey());
462462
}
463463
}
464464
if (filteredHosts.isEmpty()) {
465-
//last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried
465+
//last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
466466
List<Map.Entry<HttpHost, DeadHostState>> sortedHosts = new ArrayList<>(blacklist.entrySet());
467467
if (sortedHosts.size() > 0) {
468468
Collections.sort(sortedHosts, new Comparator<Map.Entry<HttpHost, DeadHostState>>() {
469469
@Override
470470
public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, DeadHostState> o2) {
471-
return Long.compare(o1.getValue().getDeadUntilNanos(), o2.getValue().getDeadUntilNanos());
471+
return o1.getValue().compareTo(o2.getValue());
472472
}
473473
});
474474
HttpHost deadHost = sortedHosts.get(0).getKey();
@@ -499,14 +499,15 @@ private void onResponse(HttpHost host) {
499499
* Called after each failed attempt.
500500
* Receives as an argument the host that was used for the failed attempt.
501501
*/
502-
private void onFailure(HttpHost host) throws IOException {
502+
private void onFailure(HttpHost host) {
503503
while(true) {
504-
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE);
504+
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, new DeadHostState(DeadHostState.TimeSupplier.DEFAULT));
505505
if (previousDeadHostState == null) {
506506
logger.debug("added host [" + host + "] to blacklist");
507507
break;
508508
}
509-
if (blacklist.replace(host, previousDeadHostState, new DeadHostState(previousDeadHostState))) {
509+
if (blacklist.replace(host, previousDeadHostState,
510+
new DeadHostState(previousDeadHostState, DeadHostState.TimeSupplier.DEFAULT))) {
510511
logger.debug("updated host [" + host + "] already in blacklist");
511512
break;
512513
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
import static org.hamcrest.MatcherAssert.assertThat;
25+
import static org.hamcrest.Matchers.equalTo;
26+
import static org.hamcrest.Matchers.greaterThan;
27+
import static org.hamcrest.Matchers.is;
28+
import static org.hamcrest.Matchers.lessThan;
29+
30+
public class DeadHostStateTests extends RestClientTestCase {
31+
32+
private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};
33+
34+
public void testInitialDeadHostStateDefaultTimeSupplier() {
35+
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
36+
long currentTime = System.nanoTime();
37+
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(currentTime));
38+
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
39+
}
40+
41+
public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
42+
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
43+
int iters = randomIntBetween(5, 30);
44+
for (int i = 0; i < iters; i++) {
45+
DeadHostState deadHostState = new DeadHostState(previous, DeadHostState.TimeSupplier.DEFAULT);
46+
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(previous.getDeadUntilNanos()));
47+
assertThat(deadHostState.getFailedAttempts(), equalTo(previous.getFailedAttempts() + 1));
48+
previous = deadHostState;
49+
}
50+
}
51+
52+
public void testCompareToDefaultTimeSupplier() {
53+
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
54+
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
55+
for (int i = 0; i < numObjects; i++) {
56+
if (i == 0) {
57+
deadHostStates[i] = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
58+
} else {
59+
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1], DeadHostState.TimeSupplier.DEFAULT);
60+
}
61+
}
62+
for (int k = 1; k < deadHostStates.length; k++) {
63+
assertThat(deadHostStates[k - 1].getDeadUntilNanos(), lessThan(deadHostStates[k].getDeadUntilNanos()));
64+
assertThat(deadHostStates[k - 1], lessThan(deadHostStates[k]));
65+
}
66+
}
67+
68+
public void testShallBeRetried() {
69+
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
70+
DeadHostState deadHostState = null;
71+
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
72+
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
73+
timeSupplier.nanoTime = 0;
74+
if (i == 0) {
75+
deadHostState = new DeadHostState(timeSupplier);
76+
} else {
77+
deadHostState = new DeadHostState(deadHostState, timeSupplier);
78+
}
79+
for (int j = 0; j < expectedTimeoutSecond; j++) {
80+
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
81+
assertThat(deadHostState.shallBeRetried(), is(false));
82+
}
83+
int iters = randomIntBetween(5, 30);
84+
for (int j = 0; j < iters; j++) {
85+
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
86+
assertThat(deadHostState.shallBeRetried(), is(true));
87+
}
88+
}
89+
}
90+
91+
public void testDeadHostStateTimeouts() {
92+
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
93+
zeroTimeSupplier.nanoTime = 0L;
94+
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
95+
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
96+
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
97+
previous = new DeadHostState(previous, zeroTimeSupplier);
98+
}
99+
//check that from here on the timeout does not increase
100+
int iters = randomIntBetween(5, 30);
101+
for (int i = 0; i < iters; i++) {
102+
DeadHostState deadHostState = new DeadHostState(previous, zeroTimeSupplier);
103+
assertThat(TimeUnit.NANOSECONDS.toSeconds(deadHostState.getDeadUntilNanos()),
104+
equalTo(EXPECTED_TIMEOUTS_SECONDS[EXPECTED_TIMEOUTS_SECONDS.length - 1]));
105+
previous = deadHostState;
106+
}
107+
}
108+
109+
private static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
110+
111+
long nanoTime;
112+
113+
@Override
114+
public long nanoTime() {
115+
return nanoTime;
116+
}
117+
}
118+
}

client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
101101

102102
@Before
103103
@SuppressWarnings("unchecked")
104-
public void createRestClient() throws IOException {
104+
public void createRestClient() {
105105
httpClient = mock(CloseableHttpAsyncClient.class);
106106
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
107107
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
@@ -160,17 +160,6 @@ public void shutdownExec() {
160160
exec.shutdown();
161161
}
162162

163-
public void testNullPath() throws IOException {
164-
for (String method : getHttpMethods()) {
165-
try {
166-
restClient.performRequest(method, null);
167-
fail("path set to null should fail!");
168-
} catch (NullPointerException e) {
169-
assertEquals("path must not be null", e.getMessage());
170-
}
171-
}
172-
}
173-
174163
/**
175164
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
176165
*/
@@ -196,33 +185,6 @@ public void testInternalHttpRequest() throws Exception {
196185
}
197186
}
198187

199-
public void testSetHosts() throws IOException {
200-
try {
201-
restClient.setHosts((HttpHost[]) null);
202-
fail("setHosts should have failed");
203-
} catch (IllegalArgumentException e) {
204-
assertEquals("hosts must not be null nor empty", e.getMessage());
205-
}
206-
try {
207-
restClient.setHosts();
208-
fail("setHosts should have failed");
209-
} catch (IllegalArgumentException e) {
210-
assertEquals("hosts must not be null nor empty", e.getMessage());
211-
}
212-
try {
213-
restClient.setHosts((HttpHost) null);
214-
fail("setHosts should have failed");
215-
} catch (NullPointerException e) {
216-
assertEquals("host cannot be null", e.getMessage());
217-
}
218-
try {
219-
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
220-
fail("setHosts should have failed");
221-
} catch (NullPointerException e) {
222-
assertEquals("host cannot be null", e.getMessage());
223-
}
224-
}
225-
226188
/**
227189
* End to end test for ok status codes
228190
*/
@@ -289,7 +251,7 @@ public void testErrorStatusCodes() throws IOException {
289251
}
290252
}
291253

292-
public void testIOExceptions() throws IOException {
254+
public void testIOExceptions() {
293255
for (String method : getHttpMethods()) {
294256
//IOExceptions should be let bubble up
295257
try {

0 commit comments

Comments
 (0)