Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,50 @@
* when the host should be retried (based on number of previous failed attempts).
* Class is immutable, a new copy of it should be created each time the state has to be changed.
*/
final class DeadHostState {
final class DeadHostState implements Comparable<DeadHostState> {

private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);

static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


private final int failedAttempts;
private final long deadUntilNanos;
private final TimeSupplier timeSupplier;

private DeadHostState() {
/**
* Build the initial dead state of a host. Useful when a working host stops functioning
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
*
* @param timeSupplier a way to supply the current time and allow for unit testing
*/
DeadHostState(TimeSupplier timeSupplier) {
this.failedAttempts = 1;
this.deadUntilNanos = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}

/**
* We keep track of how many times a certain node fails consecutively. The higher that number is the longer we will wait
* to retry that same node again. Minimum is 1 minute (for a node the only failed once), maximum is 30 minutes (for a node
* that failed many consecutive times).
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
*
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
*/
DeadHostState(DeadHostState previousDeadHostState) {
DeadHostState(DeadHostState previousDeadHostState, TimeSupplier timeSupplier) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntilNanos = System.nanoTime() + timeoutNanos;
this.deadUntilNanos = timeSupplier.nanoTime() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = timeSupplier;
}

/**
* Indicates whether it's time to retry to failed host or not.
*
* @return true if the host should be retried, false otherwise
*/
boolean shallBeRetried() {
return timeSupplier.nanoTime() - deadUntilNanos > 0;
}

/**
Expand All @@ -61,11 +80,35 @@ long getDeadUntilNanos() {
return deadUntilNanos;
}

int getFailedAttempts() {
return failedAttempts;
}

@Override
public int compareTo(DeadHostState other) {
return Long.compare(deadUntilNanos, other.deadUntilNanos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}

@Override
public String toString() {
return "DeadHostState{" +
"failedAttempts=" + failedAttempts +
", deadUntilNanos=" + deadUntilNanos +
'}';
}

/**
* Time supplier that makes timing aspects pluggable to ease testing
*/
interface TimeSupplier {

TimeSupplier DEFAULT = new TimeSupplier() {
@Override
public long nanoTime() {
return System.nanoTime();
}
};

long nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;

import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
Expand All @@ -72,7 +73,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLHandshakeException;

/**
* Client that connects to an Elasticsearch cluster through HTTP.
Expand Down Expand Up @@ -457,18 +457,18 @@ private HostTuple<Iterator<HttpHost>> nextHost() {
do {
Set<HttpHost> filteredHosts = new HashSet<>(hostTuple.hosts);
for (Map.Entry<HttpHost, DeadHostState> entry : blacklist.entrySet()) {
if (System.nanoTime() - entry.getValue().getDeadUntilNanos() < 0) {
if (entry.getValue().shallBeRetried() == false) {
filteredHosts.remove(entry.getKey());
}
}
if (filteredHosts.isEmpty()) {
//last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried
//last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
List<Map.Entry<HttpHost, DeadHostState>> sortedHosts = new ArrayList<>(blacklist.entrySet());
if (sortedHosts.size() > 0) {
Collections.sort(sortedHosts, new Comparator<Map.Entry<HttpHost, DeadHostState>>() {
@Override
public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, DeadHostState> o2) {
return Long.compare(o1.getValue().getDeadUntilNanos(), o2.getValue().getDeadUntilNanos());
return o1.getValue().compareTo(o2.getValue());
}
});
HttpHost deadHost = sortedHosts.get(0).getKey();
Expand Down Expand Up @@ -499,14 +499,15 @@ private void onResponse(HttpHost host) {
* Called after each failed attempt.
* Receives as an argument the host that was used for the failed attempt.
*/
private void onFailure(HttpHost host) throws IOException {
private void onFailure(HttpHost host) {
while(true) {
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE);
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, new DeadHostState(DeadHostState.TimeSupplier.DEFAULT));
if (previousDeadHostState == null) {
logger.debug("added host [" + host + "] to blacklist");
break;
}
if (blacklist.replace(host, previousDeadHostState, new DeadHostState(previousDeadHostState))) {
if (blacklist.replace(host, previousDeadHostState,
new DeadHostState(previousDeadHostState, DeadHostState.TimeSupplier.DEFAULT))) {
logger.debug("updated host [" + host + "] already in blacklist");
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public class DeadHostStateTests extends RestClientTestCase {

private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};

public void testInitialDeadHostStateDefaultTimeSupplier() {
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
long currentTime = System.nanoTime();
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(currentTime));
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
}

public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous, DeadHostState.TimeSupplier.DEFAULT);
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(previous.getDeadUntilNanos()));
assertThat(deadHostState.getFailedAttempts(), equalTo(previous.getFailedAttempts() + 1));
previous = deadHostState;
}
}

public void testCompareToDefaultTimeSupplier() {
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
for (int i = 0; i < numObjects; i++) {
if (i == 0) {
deadHostStates[i] = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
} else {
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1], DeadHostState.TimeSupplier.DEFAULT);
}
}
for (int k = 1; k < deadHostStates.length; k++) {
assertThat(deadHostStates[k - 1].getDeadUntilNanos(), lessThan(deadHostStates[k].getDeadUntilNanos()));
assertThat(deadHostStates[k - 1], lessThan(deadHostStates[k]));
}
}

public void testShallBeRetried() {
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
DeadHostState deadHostState = null;
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
timeSupplier.nanoTime = 0;
if (i == 0) {
deadHostState = new DeadHostState(timeSupplier);
} else {
deadHostState = new DeadHostState(deadHostState, timeSupplier);
}
for (int j = 0; j < expectedTimeoutSecond; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
assertThat(deadHostState.shallBeRetried(), is(false));
}
int iters = randomIntBetween(5, 30);
for (int j = 0; j < iters; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
assertThat(deadHostState.shallBeRetried(), is(true));
}
}
}

public void testDeadHostStateTimeouts() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me like this should be part of testDeadHostStateFromPrevious. If you really want to have a version of the test that use System.nanoTime I'd name it with DefaultTimeProvder in the method name and name the method that uses the ConfigurableTimeSupplier more like testDeadHostStateFromPrevious.

ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
zeroTimeSupplier.nanoTime = 0L;
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
previous = new DeadHostState(previous, zeroTimeSupplier);
}
//check that from here on the timeout does not increase
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous, zeroTimeSupplier);
assertThat(TimeUnit.NANOSECONDS.toSeconds(deadHostState.getDeadUntilNanos()),
equalTo(EXPECTED_TIMEOUTS_SECONDS[EXPECTED_TIMEOUTS_SECONDS.length - 1]));
previous = deadHostState;
}
}

private static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {

long nanoTime;

@Override
public long nanoTime() {
return nanoTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {

@Before
@SuppressWarnings("unchecked")
public void createRestClient() throws IOException {
public void createRestClient() {
httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
Expand Down Expand Up @@ -160,17 +160,6 @@ public void shutdownExec() {
exec.shutdown();
}

public void testNullPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}

/**
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
*/
Expand All @@ -196,33 +185,6 @@ public void testInternalHttpRequest() throws Exception {
}
}

public void testSetHosts() throws IOException {
try {
restClient.setHosts((HttpHost[]) null);
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try {
restClient.setHosts();
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try {
restClient.setHosts((HttpHost) null);
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
try {
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
}

/**
* End to end test for ok status codes
*/
Expand Down Expand Up @@ -289,7 +251,7 @@ public void testErrorStatusCodes() throws IOException {
}
}

public void testIOExceptions() throws IOException {
public void testIOExceptions() {
for (String method : getHttpMethods()) {
//IOExceptions should be let bubble up
try {
Expand Down
Loading