Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
add system default route planner to JestClientFactory so that system …
Browse files Browse the repository at this point in the history
…default proxy is used. - #56
  • Loading branch information
Cihat Keser committed Mar 30, 2014
1 parent b5d3720 commit 4b8300b
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 55 deletions.
5 changes: 5 additions & 0 deletions jest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>org.littleshoot</groupId>
<artifactId>littleproxy</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
116 changes: 61 additions & 55 deletions jest/src/main/java/io/searchbox/client/JestClientFactory.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package io.searchbox.client;

import com.google.gson.Gson;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.http.JestHttpClient;

import java.util.LinkedHashSet;
import java.util.Map;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.HttpRoutePlanner;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import java.net.ProxySelector;
import java.util.LinkedHashSet;
import java.util.Map;

/**
* @author Dogukan Sonmez
Expand Down Expand Up @@ -60,65 +61,70 @@ public JestClient getObject() {
client.setServers(servers);
}

client.setAsyncClient(HttpAsyncClients.createDefault());
client.setAsyncClient(HttpAsyncClients.custom().setRoutePlanner(getRoutePlanner()).build());
return client;
}

private CloseableHttpClient createHttpClient() {
return configureHttpClient(HttpClients.custom()
.setConnectionManager(createConnectionManager())
.setDefaultRequestConfig(createRequestConfig()))
.build();
}

/**
* Extension point
*
* Example:
* <pre>
* final JestClientFactory factory = new JestClientFactory() {
* {@literal @Override}
private CloseableHttpClient createHttpClient() {
return configureHttpClient(HttpClients.custom()
.setConnectionManager(createConnectionManager())
.setDefaultRequestConfig(createRequestConfig()))
.setRoutePlanner(getRoutePlanner())
.build();
}

/**
* Extension point
* <p/>
* Example:
* <pre>
* final JestClientFactory factory = new JestClientFactory() {
* {@literal @Override}
* protected HttpClientBuilder configureHttpClient(HttpClientBuilder builder) {
* return builder.setDefaultHeaders(...);
* }
* }
* }
* </pre>
*
* @param builder
* @return
*/
protected HttpClientBuilder configureHttpClient(final HttpClientBuilder builder) {
return builder;
}

protected RequestConfig createRequestConfig() {
return RequestConfig.custom()
.setConnectionRequestTimeout(httpClientConfig.getConnTimeout())
.setSocketTimeout(httpClientConfig.getReadTimeout())
.build();
}

protected HttpClientConnectionManager createConnectionManager() {
if(httpClientConfig.isMultiThreaded()) {
*
* @param builder
* @return
*/
protected HttpClientBuilder configureHttpClient(final HttpClientBuilder builder) {
return builder;
}

protected HttpRoutePlanner getRoutePlanner() {
return new SystemDefaultRoutePlanner(ProxySelector.getDefault());
}

protected RequestConfig createRequestConfig() {
return RequestConfig.custom()
.setConnectionRequestTimeout(httpClientConfig.getConnTimeout())
.setSocketTimeout(httpClientConfig.getReadTimeout())
.build();
}

protected HttpClientConnectionManager createConnectionManager() {
if (httpClientConfig.isMultiThreaded()) {
log.debug("Multi-threaded http connection manager created");
final PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
final Integer maxTotal = httpClientConfig.getMaxTotalConnection();
if (maxTotal != null) {
cm.setMaxTotal(maxTotal);
}
final Integer defaultMaxPerRoute = httpClientConfig.getDefaultMaxTotalConnectionPerRoute();
if (defaultMaxPerRoute != null) {
cm.setDefaultMaxPerRoute(defaultMaxPerRoute);
}
final Map<HttpRoute, Integer> maxPerRoute = httpClientConfig.getMaxTotalConnectionPerRoute();
for (HttpRoute route : maxPerRoute.keySet()) {
cm.setMaxPerRoute(route, maxPerRoute.get(route));
}
return cm;
}
final PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
final Integer maxTotal = httpClientConfig.getMaxTotalConnection();
if (maxTotal != null) {
cm.setMaxTotal(maxTotal);
}
final Integer defaultMaxPerRoute = httpClientConfig.getDefaultMaxTotalConnectionPerRoute();
if (defaultMaxPerRoute != null) {
cm.setDefaultMaxPerRoute(defaultMaxPerRoute);
}
final Map<HttpRoute, Integer> maxPerRoute = httpClientConfig.getMaxTotalConnectionPerRoute();
for (HttpRoute route : maxPerRoute.keySet()) {
cm.setMaxPerRoute(route, maxPerRoute.get(route));
}
return cm;
}
log.debug("Default http connection is created without multi threaded option");
return new BasicHttpClientConnectionManager();
}
return new BasicHttpClientConnectionManager();
}

public Class<?> getObjectType() {
return JestClient.class;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package io.searchbox.client.http;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.JestResultHandler;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.indices.Status;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.*;
import org.littleshoot.proxy.HttpFilters;
import org.littleshoot.proxy.HttpFiltersAdapter;
import org.littleshoot.proxy.HttpFiltersSourceAdapter;
import org.littleshoot.proxy.HttpProxyServer;
import org.littleshoot.proxy.impl.DefaultHttpProxyServer;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author cihat keser
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0)
public class JestHttpClientProxyIntegrationTest extends ElasticsearchIntegrationTest {

private AtomicInteger numProxyRequests = new AtomicInteger(0);
private JestClientFactory factory = new JestClientFactory();
private HttpProxyServer server = null;

private static String nonProxyHostsDefault;
private static String proxyHostDefault;
private static String proxyPortDefault;
private static String useSystemProxiesDefault;

@BeforeClass
public static void setupOnce() throws URISyntaxException {
proxyHostDefault = System.getProperty("http.proxyHost");
proxyPortDefault = System.getProperty("http.proxyPort");
useSystemProxiesDefault = System.getProperty("java.net.useSystemProxies");

System.setProperty("http.proxyHost", "localhost");
System.setProperty("http.proxyPort", "8790");
nonProxyHostsDefault = System.getProperty("http.nonProxyHosts");
System.setProperty("http.nonProxyHosts", ""); // we want localhost to go through proxy
System.setProperty("java.net.useSystemProxies", "true");
}

@Before
public void setup() {
server = DefaultHttpProxyServer
.bootstrap()
.withPort(8790)
.withFiltersSource(new HttpFiltersSourceAdapter() {
public HttpFilters filterRequest(HttpRequest originalRequest, ChannelHandlerContext ctx) {
return new HttpFiltersAdapter(originalRequest) {
@Override
public HttpResponse requestPre(HttpObject httpObject) {
if (httpObject instanceof HttpRequest) {
if (((HttpRequest) httpObject).getUri().contains("localhost:9200")) {
numProxyRequests.incrementAndGet();
}
}
return null;
}

@Override
public HttpResponse requestPost(HttpObject httpObject) {
return null;
}

@Override
public HttpObject responsePre(HttpObject httpObject) {
return httpObject;
}

@Override
public HttpObject responsePost(HttpObject httpObject) {
return httpObject;
}
};
}
})
.start();
}

@After
public void destroy() {
if (server != null) server.stop();
}

@AfterClass
public static void destroyOnce() {
rollbackSystemProperty("http.proxyHost", proxyHostDefault);
rollbackSystemProperty("http.proxyPort", proxyPortDefault);
rollbackSystemProperty("http.nonProxyHosts", nonProxyHostsDefault);
rollbackSystemProperty("java.net.useSystemProxies", useSystemProxiesDefault);
}

private static void rollbackSystemProperty(String key, String value) {
if (value == null) {
System.clearProperty(key);
} else {
System.setProperty(key, value);
}
}

@Test
public void testConnectionThroughDefaultProxy() throws IOException, ExecutionException, InterruptedException {
cluster().ensureAtLeastNumNodes(1);

factory.setHttpClientConfig(new HttpClientConfig
.Builder("http://localhost:9200")
.build());
JestHttpClient jestClient = (JestHttpClient) factory.getObject();
assertNotNull(jestClient);

JestResult result = jestClient.execute(new Status.Builder().build());
assertNotNull(result);
assertEquals(1, numProxyRequests.intValue());

factory.setHttpClientConfig(new HttpClientConfig
.Builder("http://localhost:9200")
.multiThreaded(true)
.build());
jestClient = (JestHttpClient) factory.getObject();
assertNotNull(jestClient);

final AtomicBoolean actionExecuted = new AtomicBoolean(false);
jestClient.executeAsync(new Status.Builder().build(), new JestResultHandler<JestResult>() {
@Override
public void completed(JestResult result) {
actionExecuted.set(true);
}

@Override
public void failed(Exception ex) {
actionExecuted.set(false);
}
});
int retries = 0;
while (!actionExecuted.get() && retries < 10) {
Thread.sleep(200);
retries++;
}
assertEquals(2, numProxyRequests.intValue());
}
}
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.littleshoot</groupId>
<artifactId>littleproxy</artifactId>
<version>1.0.0-beta7</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit 4b8300b

Please sign in to comment.