diff --git a/presto-docs/src/main/sphinx/connector/elasticsearch.rst b/presto-docs/src/main/sphinx/connector/elasticsearch.rst
index ea43ca24b06bb..52441fbf1ab91 100644
--- a/presto-docs/src/main/sphinx/connector/elasticsearch.rst
+++ b/presto-docs/src/main/sphinx/connector/elasticsearch.rst
@@ -49,6 +49,8 @@ Property Name Description
``elasticsearch.connect-timeout`` Timeout for connections to Elasticsearch hosts.
``elasticsearch.max-retry-time`` Maximum duration across all retry attempts for a single request.
``elasticsearch.node-refresh-interval`` How often to refresh the list of available Elasticsearch nodes.
+``elasticsearch.max-http-connections`` Maximum number of persistent HTTP connections to Elasticsearch.
+``elasticsearch.http-thread-count`` Number of threads handling HTTP connections to Elasticsearch.
============================================= ==============================================================================
``elasticsearch.host``
@@ -127,6 +129,20 @@ This property controls how often the list of available Elasticsearch nodes is re
This property is optional; the default is ``1m``.
+``elasticsearch.max-http-connections``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+This property controls the maximum number of persistent HTTP connections to Elasticsearch.
+
+This property is optional; the default is ``25``.
+
+``elasticsearch.http-thread-count``
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+This property controls the number of threads handling HTTP connections to Elasticsearch.
+
+This property is optional; the default is number of available processors.
+
TLS Security
------------
The Elasticsearch connector provides additional security options to support Elasticsearch clusters that have been configured to use TLS.
diff --git a/presto-elasticsearch/pom.xml b/presto-elasticsearch/pom.xml
index 80204cffa60ba..25c03bf3b1495 100644
--- a/presto-elasticsearch/pom.xml
+++ b/presto-elasticsearch/pom.xml
@@ -118,6 +118,12 @@
+
+ org.apache.httpcomponents
+ httpcore-nio
+ 4.4.5
+
+
org.apache.httpcomponents
httpclient
@@ -278,6 +284,12 @@
test
+
+ org.assertj
+ assertj-core
+ test
+
+
org.elasticsearch.plugin
transport-netty4-client
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/BuiltinColumns.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/BuiltinColumns.java
index 83764b7a0d0b6..013355ca4ffe1 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/BuiltinColumns.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/BuiltinColumns.java
@@ -14,15 +14,14 @@
package com.facebook.presto.elasticsearch;
import com.facebook.presto.common.type.Type;
+import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
-import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Set;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
-import static com.facebook.presto.elasticsearch.ElasticsearchMetadata.SUPPORTS_PREDICATES;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
enum BuiltinColumns
@@ -58,6 +57,11 @@ public Type getType()
public ColumnMetadata getMetadata()
{
- return new ColumnMetadata(name, type, "", null, true, ImmutableMap.of(SUPPORTS_PREDICATES, supportsPredicates));
+ return new ColumnMetadata(name, type, "", null, true);
+ }
+
+ public ColumnHandle getColumnHandle()
+ {
+ return new ElasticsearchColumnHandle(name, type, supportsPredicates);
}
}
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/CountQueryPageSource.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/CountQueryPageSource.java
new file mode 100644
index 0000000000000..06f33e67d312d
--- /dev/null
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/CountQueryPageSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.elasticsearch;
+
+import com.facebook.presto.common.Page;
+import com.facebook.presto.elasticsearch.client.ElasticsearchClient;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+
+import static com.facebook.presto.elasticsearch.ElasticsearchQueryBuilder.buildSearchQuery;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+
+public class CountQueryPageSource
+ implements ConnectorPageSource
+{
+ // This implementation of the page source is used whenever a query doesn't reference any columns
+ // from the ES table. We need to limit the number of rows per page in case there are projections
+ // in the query that can cause page sizes to explode. For example: SELECT rand() FROM some_table
+ private static final int BATCH_SIZE = 10000;
+
+ private final long readTimeNanos;
+ private long remaining;
+
+ public CountQueryPageSource(ElasticsearchClient client, ConnectorSession session, ElasticsearchTableHandle table, ElasticsearchSplit split)
+ {
+ requireNonNull(client, "client is null");
+ requireNonNull(session, "session is null");
+ requireNonNull(table, "table is null");
+ requireNonNull(split, "split is null");
+
+ long start = System.nanoTime();
+ long count = client.count(
+ split.getIndex(),
+ split.getShard(),
+ buildSearchQuery(session, split.getTupleDomain().transform(ElasticsearchColumnHandle.class::cast), table.getQuery()));
+
+ readTimeNanos = System.nanoTime() - start;
+ remaining = count;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return remaining == 0;
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ int batch = toIntExact(Math.min(BATCH_SIZE, remaining));
+ remaining -= batch;
+
+ return new Page(batch);
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return readTimeNanos;
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getCompletedPositions()
+ {
+ return BATCH_SIZE;
+ }
+
+ @Override
+ public long getSystemMemoryUsage()
+ {
+ return 0;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java
index a86fa259eb55c..4822bd7b6756f 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java
@@ -45,6 +45,8 @@ public enum Security
private Duration connectTimeout = new Duration(1, SECONDS);
private Duration maxRetryTime = new Duration(30, SECONDS);
private Duration nodeRefreshInterval = new Duration(1, MINUTES);
+ private int maxHttpConnections = 25;
+ private int httpThreadCount = Runtime.getRuntime().availableProcessors();
private boolean tlsEnabled;
private File keystorePath;
@@ -195,6 +197,34 @@ public ElasticsearchConfig setNodeRefreshInterval(Duration nodeRefreshInterval)
return this;
}
+ @Config("elasticsearch.max-http-connections")
+ @ConfigDescription("Maximum number of persistent HTTP connections to Elasticsearch")
+ public ElasticsearchConfig setMaxHttpConnections(int maxHttpConnections)
+ {
+ this.maxHttpConnections = maxHttpConnections;
+ return this;
+ }
+
+ @NotNull
+ public int getMaxHttpConnections()
+ {
+ return maxHttpConnections;
+ }
+
+ @Config("elasticsearch.http-thread-count")
+ @ConfigDescription("Number of threads handling HTTP connections to Elasticsearch")
+ public ElasticsearchConfig setHttpThreadCount(int httpThreadCount)
+ {
+ this.httpThreadCount = httpThreadCount;
+ return this;
+ }
+
+ @NotNull
+ public int getHttpThreadCount()
+ {
+ return httpThreadCount;
+ }
+
public boolean isTlsEnabled()
{
return tlsEnabled;
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java
index c2ca18776d285..0e9f6e2c0a823 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchMetadata.java
@@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
@@ -62,10 +63,6 @@
public class ElasticsearchMetadata
implements ConnectorMetadata
{
- private static final String ORIGINAL_NAME = "original-name";
-
- public static final String SUPPORTS_PREDICATES = "supports-predicates";
-
private final ElasticsearchClient client;
private final String schemaName;
@@ -125,30 +122,68 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
private ConnectorTableMetadata getTableMetadata(String schemaName, String tableName)
{
- IndexMetadata metadata = client.getIndexMetadata(tableName);
+ InternalTableMetadata internalTableMetadata = makeInternalTableMetadata(schemaName, tableName);
+ return new ConnectorTableMetadata(new SchemaTableName(schemaName, tableName), internalTableMetadata.getColumnMetadata());
+ }
- return new ConnectorTableMetadata(
- new SchemaTableName(schemaName, tableName),
- toColumnMetadata(metadata));
+ private InternalTableMetadata makeInternalTableMetadata(ConnectorTableHandle table)
+ {
+ ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table;
+ return makeInternalTableMetadata(handle.getSchema(), handle.getIndex());
}
- private List toColumnMetadata(IndexMetadata metadata)
+ private InternalTableMetadata makeInternalTableMetadata(String schema, String tableName)
{
- ImmutableList.Builder result = ImmutableList.builder();
+ IndexMetadata metadata = client.getIndexMetadata(tableName);
+ List fields = getColumnFields(metadata);
+ return new InternalTableMetadata(new SchemaTableName(schema, tableName), makeColumnMetadata(fields), makeColumnHandles(fields));
+ }
- result.add(BuiltinColumns.ID.getMetadata());
- result.add(BuiltinColumns.SOURCE.getMetadata());
- result.add(BuiltinColumns.SCORE.getMetadata());
+ private List getColumnFields(IndexMetadata metadata)
+ {
+ ImmutableList.Builder result = ImmutableList.builder();
+
+ Map counts = metadata.getSchema()
+ .getFields().stream()
+ .collect(Collectors.groupingBy(f -> f.getName().toLowerCase(ENGLISH), Collectors.counting()));
for (IndexMetadata.Field field : metadata.getSchema().getFields()) {
Type type = toPrestoType(field);
- if (type == null) {
+ if (type == null || counts.get(field.getName().toLowerCase(ENGLISH)) > 1) {
continue;
}
+ result.add(field);
+ }
+ return result.build();
+ }
+
+ private List makeColumnMetadata(List fields)
+ {
+ ImmutableList.Builder result = ImmutableList.builder();
- result.add(makeColumnMetadata(field.getName(), type, supportsPredicates(field.getType())));
+ for (BuiltinColumns builtinColumn : BuiltinColumns.values()) {
+ result.add(builtinColumn.getMetadata());
}
+ for (IndexMetadata.Field field : fields) {
+ result.add(new ColumnMetadata(field.getName(), toPrestoType(field)));
+ }
+ return result.build();
+ }
+
+ private Map makeColumnHandles(List fields)
+ {
+ ImmutableMap.Builder result = ImmutableMap.builder();
+ for (BuiltinColumns builtinColumn : BuiltinColumns.values()) {
+ result.put(builtinColumn.getName(), builtinColumn.getColumnHandle());
+ }
+
+ for (IndexMetadata.Field field : fields) {
+ result.put(field.getName(), new ElasticsearchColumnHandle(
+ field.getName(),
+ toPrestoType(field),
+ supportsPredicates(field.getType())));
+ }
return result.build();
}
@@ -253,24 +288,15 @@ public List listTables(ConnectorSession session, Optional getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
- ImmutableMap.Builder results = ImmutableMap.builder();
-
- ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableHandle);
- for (ColumnMetadata column : tableMetadata.getColumns()) {
- results.put(column.getName(), new ElasticsearchColumnHandle(
- (String) column.getProperties().getOrDefault(ORIGINAL_NAME, column.getName()),
- column.getType(),
- (Boolean) column.getProperties().get(SUPPORTS_PREDICATES)));
- }
-
- return results.build();
+ InternalTableMetadata tableMetadata = makeInternalTableMetadata(tableHandle);
+ return tableMetadata.getColumnHandles();
}
@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
ElasticsearchColumnHandle handle = (ElasticsearchColumnHandle) columnHandle;
- return makeColumnMetadata(handle.getName(), handle.getType(), handle.isSupportsPredicates());
+ return new ColumnMetadata(handle.getName(), handle.getType());
}
@Override
@@ -290,16 +316,35 @@ public Map> listTableColumns(ConnectorSess
.collect(toImmutableMap(ConnectorTableMetadata::getTable, ConnectorTableMetadata::getColumns));
}
- private static ColumnMetadata makeColumnMetadata(String name, Type type, boolean supportsPredicates)
+ private static class InternalTableMetadata
{
- return new ColumnMetadata(
- name,
- type,
- null,
- null,
- false,
- ImmutableMap.of(
- ORIGINAL_NAME, name,
- SUPPORTS_PREDICATES, supportsPredicates));
+ private final SchemaTableName tableName;
+ private final List columnMetadata;
+ private final Map columnHandles;
+
+ public InternalTableMetadata(
+ SchemaTableName tableName,
+ List columnMetadata,
+ Map columnHandles)
+ {
+ this.tableName = tableName;
+ this.columnMetadata = columnMetadata;
+ this.columnHandles = columnHandles;
+ }
+
+ public SchemaTableName getTableName()
+ {
+ return tableName;
+ }
+
+ public List getColumnMetadata()
+ {
+ return columnMetadata;
+ }
+
+ public Map getColumnHandles()
+ {
+ return columnHandles;
+ }
}
}
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSourceProvider.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSourceProvider.java
index 2c41bd04e0b28..50c197e013adf 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSourceProvider.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSourceProvider.java
@@ -53,11 +53,16 @@ public ConnectorPageSource createPageSource(
requireNonNull(split, "split is null");
requireNonNull(layout, "layout is null");
ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout;
- return new ElasticsearchPageSource(
+ ElasticsearchSplit elasticsearchSplit = (ElasticsearchSplit) split;
+
+ if (columns.isEmpty()) {
+ return new CountQueryPageSource(client, session, layoutHandle.getTable(), elasticsearchSplit);
+ }
+ return new ScanQueryPageSource(
client,
session,
layoutHandle.getTable(),
- (ElasticsearchSplit) split,
+ elasticsearchSplit,
columns.stream()
.map(ElasticsearchColumnHandle.class::cast)
.collect(toImmutableList()));
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSource.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java
similarity index 96%
rename from presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSource.java
rename to presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java
index ba021b7e26b8f..63b42433bfcd9 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPageSource.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ScanQueryPageSource.java
@@ -72,10 +72,10 @@
import static java.util.function.Predicate.isEqual;
import static java.util.stream.Collectors.toList;
-public class ElasticsearchPageSource
+public class ScanQueryPageSource
implements ConnectorPageSource
{
- private static final Logger LOG = Logger.get(ElasticsearchPageSource.class);
+ private static final Logger LOG = Logger.get(ScanQueryPageSource.class);
private final List decoders;
@@ -84,10 +84,9 @@ public class ElasticsearchPageSource
private final List columns;
private long totalBytes;
private long readTimeNanos;
- private boolean finished;
private long completedPositions;
- public ElasticsearchPageSource(
+ public ScanQueryPageSource(
ElasticsearchClient client,
ConnectorSession session,
ElasticsearchTableHandle table,
@@ -166,7 +165,7 @@ public long getReadTimeNanos()
@Override
public boolean isFinished()
{
- return finished || !iterator.hasNext();
+ return !iterator.hasNext();
}
@Override
@@ -184,18 +183,6 @@ public void close()
@Override
public Page getNextPage()
{
- if (columnBuilders.length == 0) {
- // TODO: emit "count" query against Elasticsearch
- int count = 0;
- while (iterator.hasNext()) {
- iterator.next();
- count++;
- }
-
- finished = true;
- return new Page(count);
- }
-
long size = 0;
while (size < PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES && iterator.hasNext()) {
SearchHit hit = iterator.next();
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/CountResponse.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/CountResponse.java
new file mode 100644
index 0000000000000..b6da5e505a7b6
--- /dev/null
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/CountResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.elasticsearch.client;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CountResponse
+{
+ private final long count;
+
+ @JsonCreator
+ public CountResponse(@JsonProperty("count") long count)
+ {
+ this.count = count;
+ }
+
+ @JsonProperty
+ public long getCount()
+ {
+ return count;
+ }
+}
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java
similarity index 84%
rename from presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java
rename to presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java
index 039127f834551..9452ed20118a7 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java
@@ -28,12 +28,19 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
+import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.ClearScrollRequest;
@@ -81,6 +88,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Stream;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
@@ -102,7 +111,9 @@ public class ElasticsearchClient
private static final Logger LOG = Logger.get(ElasticsearchClient.class);
private static final JsonCodec SEARCH_SHARDS_RESPONSE_CODEC = jsonCodec(SearchShardsResponse.class);
private static final JsonCodec NODES_RESPONSE_CODEC = jsonCodec(NodesResponse.class);
+ private static final JsonCodec COUNT_RESPONSE_CODEC = jsonCodec(CountResponse.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();
+ private static final Pattern ADDRESS_PATTERN = Pattern.compile("((?[^/]+)/)?(?.+):(?\\d+)");
private final RestHighLevelClient client;
private final int scrollSize;
@@ -174,13 +185,26 @@ private static RestHighLevelClient createClient(ElasticsearchConfig config, Opti
{
RestClientBuilder builder = RestClient.builder(
new HttpHost(config.getHost(), config.getPort(), config.isTlsEnabled() ? "https" : "http"))
- .setRequestConfigCallback(
- configBuilder -> configBuilder
- .setConnectTimeout(toIntExact(config.getConnectTimeout().toMillis()))
- .setSocketTimeout(toIntExact(config.getRequestTimeout().toMillis())))
.setMaxRetryTimeoutMillis((int) config.getMaxRetryTime().toMillis());
- builder.setHttpClientConfigCallback(clientBuilder -> {
+ builder.setHttpClientConfigCallback(ignored -> {
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(toIntExact(config.getConnectTimeout().toMillis()))
+ .setSocketTimeout(toIntExact(config.getRequestTimeout().toMillis()))
+ .build();
+
+ IOReactorConfig reactorConfig = IOReactorConfig.custom()
+ .setIoThreadCount(config.getHttpThreadCount())
+ .build();
+
+ // the client builder passed to the call-back is configured to use system properties, which makes it
+ // impossible to configure concurrency settings, so we need to build a new one from scratch
+ HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig)
+ .setDefaultIOReactorConfig(reactorConfig)
+ .setMaxConnPerRoute(config.getMaxHttpConnections())
+ .setMaxConnTotal(config.getMaxHttpConnections());
+
if (config.isTlsEnabled()) {
buildSslContext(config.getKeystorePath(), config.getKeystorePassword(), config.getTrustStorePath(), config.getTruststorePassword())
.ifPresent(clientBuilder::setSSLContext);
@@ -335,7 +359,10 @@ private Set fetchNodes()
NodesResponse.Node node = entry.getValue();
if (node.getRoles().contains("data")) {
- result.add(new ElasticsearchNode(nodeId, node.getAddress()));
+ Optional address = node.getAddress()
+ .flatMap(ElasticsearchClient::extractAddress);
+
+ result.add(new ElasticsearchNode(nodeId, address));
}
}
return result.build();
@@ -579,6 +606,39 @@ public SearchResponse nextPage(String scrollId)
}
}
+ public long count(String index, int shard, QueryBuilder query)
+ {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource()
+ .query(query);
+
+ LOG.debug("Count: %s:%s, query: %s", index, shard, sourceBuilder);
+
+ Response response;
+ try {
+ response = client.getLowLevelClient()
+ .performRequest(
+ "GET",
+ format("/%s/_count?preference=_shards:%s", index, shard),
+ ImmutableMap.of(),
+ new StringEntity(sourceBuilder.toString()),
+ new BasicHeader("Content-Type", "application/json"));
+ }
+ catch (ResponseException e) {
+ throw propagate(e);
+ }
+ catch (IOException e) {
+ throw new PrestoException(ELASTICSEARCH_CONNECTION_ERROR, e);
+ }
+
+ try {
+ return COUNT_RESPONSE_CODEC.fromJson(EntityUtils.toByteArray(response.getEntity()))
+ .getCount();
+ }
+ catch (IOException e) {
+ throw new PrestoException(ELASTICSEARCH_INVALID_RESPONSE, e);
+ }
+ }
+
public void clearScroll(String scrollId)
{
ClearScrollRequest request = new ClearScrollRequest();
@@ -614,6 +674,51 @@ private T doRequest(String path, ResponseHandler handler)
return handler.process(body);
}
+ private static PrestoException propagate(ResponseException exception)
+ {
+ HttpEntity entity = exception.getResponse().getEntity();
+
+ if (entity != null && entity.getContentType() != null) {
+ try {
+ JsonNode reason = OBJECT_MAPPER.readTree(entity.getContent()).path("error")
+ .path("root_cause")
+ .path(0)
+ .path("reason");
+
+ if (!reason.isMissingNode()) {
+ throw new PrestoException(ELASTICSEARCH_QUERY_FAILURE, reason.asText(), exception);
+ }
+ }
+ catch (IOException e) {
+ PrestoException result = new PrestoException(ELASTICSEARCH_QUERY_FAILURE, exception);
+ result.addSuppressed(e);
+ throw result;
+ }
+ }
+
+ throw new PrestoException(ELASTICSEARCH_QUERY_FAILURE, exception);
+ }
+
+ @VisibleForTesting
+ static Optional extractAddress(String address)
+ {
+ Matcher matcher = ADDRESS_PATTERN.matcher(address);
+
+ if (!matcher.matches()) {
+ return Optional.empty();
+ }
+
+ String cname = matcher.group("cname");
+ String ip = matcher.group("ip");
+ String port = matcher.group("port");
+
+ if (cname != null) {
+ return Optional.of(cname + ":" + port);
+ }
+
+ return Optional.of(ip + ":" + port);
+ }
+
private interface ResponseHandler
{
T process(String body);
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/RowDecoder.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/RowDecoder.java
index 97bf58f37a559..4eee60690b2c4 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/RowDecoder.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/RowDecoder.java
@@ -22,7 +22,7 @@
import java.util.function.Supplier;
import static com.facebook.presto.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_TYPE_MISMATCH;
-import static com.facebook.presto.elasticsearch.ElasticsearchPageSource.getField;
+import static com.facebook.presto.elasticsearch.ScanQueryPageSource.getField;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/VarcharDecoder.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/VarcharDecoder.java
index affa2e657cf61..750aabd2e3d24 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/VarcharDecoder.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/decoders/VarcharDecoder.java
@@ -42,11 +42,11 @@ public void decode(SearchHit hit, Supplier