Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@
* when the host should be retried (based on number of previous failed attempts).
* Class is immutable, a new copy of it should be created each time the state has to be changed.
*/
final class DeadHostState {
final class DeadHostState implements Comparable<DeadHostState> {

private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);

static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


private final int failedAttempts;
private final long deadUntilNanos;
private final TimeSupplier timeSupplier;

DeadHostState() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is worth having a no-arg ctor at all. Maybe remove it entirely so the called has to think.

this(TimeSupplier.DEFAULT);
}

private DeadHostState() {
DeadHostState(TimeSupplier timeSupplier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth javadocs on this to communicate that it is for building the initial dead state.

this.failedAttempts = 1;
this.deadUntilNanos = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.deadUntilNanos = timeSupplier.getNanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}

/**
Expand All @@ -47,10 +51,19 @@ private DeadHostState() {
* that failed many consecutive times).
*/
DeadHostState(DeadHostState previousDeadHostState) {
this(previousDeadHostState, TimeSupplier.DEFAULT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it isn't worth having the the TimeSupplier.DEFAULT versions. The caller should use TimeSupplier.DEFAULT.

}

DeadHostState(DeadHostState previousDeadHostState, TimeSupplier timeSupplier) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntilNanos = System.nanoTime() + timeoutNanos;
this.deadUntilNanos = timeSupplier.getNanoTime() + timeoutNanos;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still call it nanoTime.

this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = timeSupplier;
}

boolean shallBeRetried() {
return timeSupplier.getNanoTime() - deadUntilNanos > 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh! I'd figured you'd pass the TimeSupplier in to every method that had to call it. We wouldn't keep references to it around. But that does change the method calls a ton. I think this is a fine way to do it!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably worth having javadoc for this though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure initially, but I think this is good as it allows to isolate all the timing related aspects to DeadHostState, using the same instance of timesupplier. it feels weird to have to pass in the time supplier here too, or maybe I could do that. Would you prefer to change that?

}

/**
Expand All @@ -61,11 +74,28 @@ long getDeadUntilNanos() {
return deadUntilNanos;
}

int getFailedAttempts() {
return failedAttempts;
}

@Override
public int compareTo(DeadHostState other) {
return Long.compare(deadUntilNanos, other.deadUntilNanos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}

@Override
public String toString() {
return "DeadHostState{" +
"failedAttempts=" + failedAttempts +
", deadUntilNanos=" + deadUntilNanos +
'}';
}

static class TimeSupplier {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this an interface? I think it'd feel cleaner that way.

And it is probably worth javadoc to explain that how you fetch time is pluggable for testing. Lots of folks are used to this but anyone that hasn't had to write unit tests for code involving timeouts and such won't be used to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, I miss default methods here :)

private static final TimeSupplier DEFAULT = new TimeSupplier();

long getNanoTime() {
return System.nanoTime();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;

import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
Expand All @@ -72,7 +73,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLHandshakeException;

/**
* Client that connects to an Elasticsearch cluster through HTTP.
Expand Down Expand Up @@ -457,18 +457,18 @@ private HostTuple<Iterator<HttpHost>> nextHost() {
do {
Set<HttpHost> filteredHosts = new HashSet<>(hostTuple.hosts);
for (Map.Entry<HttpHost, DeadHostState> entry : blacklist.entrySet()) {
if (System.nanoTime() - entry.getValue().getDeadUntilNanos() < 0) {
if (entry.getValue().shallBeRetried() == false) {
filteredHosts.remove(entry.getKey());
}
}
if (filteredHosts.isEmpty()) {
//last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried
//last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
List<Map.Entry<HttpHost, DeadHostState>> sortedHosts = new ArrayList<>(blacklist.entrySet());
if (sortedHosts.size() > 0) {
Collections.sort(sortedHosts, new Comparator<Map.Entry<HttpHost, DeadHostState>>() {
@Override
public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, DeadHostState> o2) {
return Long.compare(o1.getValue().getDeadUntilNanos(), o2.getValue().getDeadUntilNanos());
return o1.getValue().compareTo(o2.getValue());
}
});
HttpHost deadHost = sortedHosts.get(0).getKey();
Expand Down Expand Up @@ -499,9 +499,9 @@ private void onResponse(HttpHost host) {
* Called after each failed attempt.
* Receives as an argument the host that was used for the failed attempt.
*/
private void onFailure(HttpHost host) throws IOException {
private void onFailure(HttpHost host) {
while(true) {
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE);
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, new DeadHostState());
if (previousDeadHostState == null) {
logger.debug("added host [" + host + "] to blacklist");
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public class DeadHostStateTests extends RestClientTestCase {

private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};

public void testInitialDeadHostState() {
DeadHostState deadHostState = new DeadHostState();
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(System.nanoTime()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep away from using the default one for all the unit tests just in case of funny pauses and stuff. Maybe one, very small, very paranoid check that use nanoTime is col though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I'd move the System.nanoTime call to before building the DeadHostState. That way if there is a super long pause due to the CI machine swapping or something terrible we still won't mind.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted some smoke test that with the right time supplier we do the right thing. I think this way should be ok as we are just testing the difference between a recent object and a previous one.

assertThat(deadHostState.getFailedAttempts(), equalTo(1));
}

public void testDeadHostStateFromPrevious() {
DeadHostState previous = new DeadHostState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use a provider for the time then you can be quite explicit here. I think that is worth doing.

int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous);
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(previous.getDeadUntilNanos()));
assertThat(deadHostState.getFailedAttempts(), equalTo(previous.getFailedAttempts() + 1));
previous = deadHostState;
}
}

public void testShallBeRetried() {
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
DeadHostState deadHostState = null;
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
timeSupplier.nanoTime = 0;
if (i == 0) {
deadHostState = new DeadHostState(timeSupplier);
} else {
deadHostState = new DeadHostState(deadHostState, timeSupplier);
}
for (int j = 0; j < expectedTimeoutSecond; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
assertThat(deadHostState.shallBeRetried(), is(false));
}
int iters = randomIntBetween(5, 30);
for (int j = 0; j < iters; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
assertThat(deadHostState.shallBeRetried(), is(true));
}
}
}

public void testDeadHostStateTimeouts() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me like this should be part of testDeadHostStateFromPrevious. If you really want to have a version of the test that use System.nanoTime I'd name it with DefaultTimeProvder in the method name and name the method that uses the ConfigurableTimeSupplier more like testDeadHostStateFromPrevious.

ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
zeroTimeSupplier.nanoTime = 0L;
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
previous = new DeadHostState(previous, zeroTimeSupplier);
}
//check that from here on the timeout does not increase
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous, zeroTimeSupplier);
assertThat(TimeUnit.NANOSECONDS.toSeconds(deadHostState.getDeadUntilNanos()),
equalTo(EXPECTED_TIMEOUTS_SECONDS[EXPECTED_TIMEOUTS_SECONDS.length - 1]));
previous = deadHostState;
}
}

public void testCompareTo() {
int numObjects = randomIntBetween(5, 30);
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
int failedAttempts = randomIntBetween(1, 5);
for (int i = 0; i < failedAttempts; i++) {
for (int j = 0; j < numObjects; j++) {
if (i == 0) {
deadHostStates[j] = new DeadHostState();
} else {
deadHostStates[j] = new DeadHostState(deadHostStates[j]);
}
}
for (int k = 1; k < deadHostStates.length; k++) {
assertThat(deadHostStates[k - 1].getDeadUntilNanos(), lessThan(deadHostStates[k].getDeadUntilNanos()));
assertThat(deadHostStates[k - 1], lessThan(deadHostStates[k]));
}
}
}

private static class ConfigurableTimeSupplier extends DeadHostState.TimeSupplier {

long nanoTime;

@Override
long getNanoTime() {
return nanoTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {

@Before
@SuppressWarnings("unchecked")
public void createRestClient() throws IOException {
public void createRestClient() {
httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
Expand Down Expand Up @@ -160,17 +160,6 @@ public void shutdownExec() {
exec.shutdown();
}

public void testNullPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}

/**
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
*/
Expand All @@ -196,33 +185,6 @@ public void testInternalHttpRequest() throws Exception {
}
}

public void testSetHosts() throws IOException {
try {
restClient.setHosts((HttpHost[]) null);
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try {
restClient.setHosts();
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try {
restClient.setHosts((HttpHost) null);
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
try {
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
}

/**
* End to end test for ok status codes
*/
Expand Down Expand Up @@ -289,7 +251,7 @@ public void testErrorStatusCodes() throws IOException {
}
}

public void testIOExceptions() throws IOException {
public void testIOExceptions() {
for (String method : getHttpMethods()) {
//IOExceptions should be let bubble up
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -147,8 +148,48 @@ public void testBuildUriLeavesPathUntouched() {
}
}

public void testSetHostsWrongArguments() throws IOException {
try (RestClient restClient = createRestClient()) {
restClient.setHosts((HttpHost[]) null);
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try (RestClient restClient = createRestClient()) {
restClient.setHosts();
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try (RestClient restClient = createRestClient()) {
restClient.setHosts((HttpHost) null);
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
try (RestClient restClient = createRestClient()) {
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
}

public void testNullPath() throws IOException {
try (RestClient restClient = createRestClient()) {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}
}

private static RestClient createRestClient() {
HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200)};
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), new Header[]{}, hosts, null, null);
return new RestClient(mock(CloseableHttpAsyncClient.class), randomIntBetween(1_000, 30_000), new Header[]{}, hosts, null, null);
}
}