Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ Property Name Description
``elasticsearch.connect-timeout`` Timeout for connections to Elasticsearch hosts.
``elasticsearch.max-retry-time`` Maximum duration across all retry attempts for a single request.
``elasticsearch.node-refresh-interval`` How often to refresh the list of available Elasticsearch nodes.
``elasticsearch.max-http-connections`` Maximum number of persistent HTTP connections to Elasticsearch.
``elasticsearch.http-thread-count`` Number of threads handling HTTP connections to Elasticsearch.
============================================= ==============================================================================

``elasticsearch.host``
Expand Down Expand Up @@ -127,6 +129,20 @@ This property controls how often the list of available Elasticsearch nodes is re

This property is optional; the default is ``1m``.

``elasticsearch.max-http-connections``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This property controls the maximum number of persistent HTTP connections to Elasticsearch.

This property is optional; the default is ``25``.

``elasticsearch.http-thread-count``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This property controls the number of threads handling HTTP connections to Elasticsearch.

This property is optional; the default is number of available processors.

TLS Security
------------
The Elasticsearch connector provides additional security options to support Elasticsearch clusters that have been configured to use TLS.
Expand Down
12 changes: 12 additions & 0 deletions presto-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.5</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -278,6 +284,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
package com.facebook.presto.elasticsearch;

import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.google.common.collect.ImmutableMap;

import java.util.Arrays;
import java.util.Set;

import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.elasticsearch.ElasticsearchMetadata.SUPPORTS_PREDICATES;
import static com.google.common.collect.ImmutableSet.toImmutableSet;

enum BuiltinColumns
Expand Down Expand Up @@ -58,6 +57,11 @@ public Type getType()

public ColumnMetadata getMetadata()
{
return new ColumnMetadata(name, type, "", null, true, ImmutableMap.of(SUPPORTS_PREDICATES, supportsPredicates));
return new ColumnMetadata(name, type, "", null, true);
}

public ColumnHandle getColumnHandle()
{
return new ElasticsearchColumnHandle(name, type, supportsPredicates);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.elasticsearch;

import com.facebook.presto.common.Page;
import com.facebook.presto.elasticsearch.client.ElasticsearchClient;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;

import static com.facebook.presto.elasticsearch.ElasticsearchQueryBuilder.buildSearchQuery;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class CountQueryPageSource
implements ConnectorPageSource
{
// This implementation of the page source is used whenever a query doesn't reference any columns
// from the ES table. We need to limit the number of rows per page in case there are projections
// in the query that can cause page sizes to explode. For example: SELECT rand() FROM some_table
private static final int BATCH_SIZE = 10000;

private final long readTimeNanos;
private long remaining;

public CountQueryPageSource(ElasticsearchClient client, ConnectorSession session, ElasticsearchTableHandle table, ElasticsearchSplit split)
{
requireNonNull(client, "client is null");
requireNonNull(session, "session is null");
requireNonNull(table, "table is null");
requireNonNull(split, "split is null");

long start = System.nanoTime();
long count = client.count(
split.getIndex(),
split.getShard(),
buildSearchQuery(session, split.getTupleDomain().transform(ElasticsearchColumnHandle.class::cast), table.getQuery()));

readTimeNanos = System.nanoTime() - start;
remaining = count;
}

@Override
public boolean isFinished()
{
return remaining == 0;
}

@Override
public Page getNextPage()
{
int batch = toIntExact(Math.min(BATCH_SIZE, remaining));
remaining -= batch;

return new Page(batch);
}

@Override
public long getReadTimeNanos()
{
return readTimeNanos;
}

@Override
public long getCompletedBytes()
{
return 0;
}

@Override
public long getCompletedPositions()
{
return BATCH_SIZE;
}

@Override
public long getSystemMemoryUsage()
{
return 0;
}

@Override
public void close()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public enum Security
private Duration connectTimeout = new Duration(1, SECONDS);
private Duration maxRetryTime = new Duration(30, SECONDS);
private Duration nodeRefreshInterval = new Duration(1, MINUTES);
private int maxHttpConnections = 25;
private int httpThreadCount = Runtime.getRuntime().availableProcessors();

private boolean tlsEnabled;
private File keystorePath;
Expand Down Expand Up @@ -195,6 +197,34 @@ public ElasticsearchConfig setNodeRefreshInterval(Duration nodeRefreshInterval)
return this;
}

@Config("elasticsearch.max-http-connections")
@ConfigDescription("Maximum number of persistent HTTP connections to Elasticsearch")
public ElasticsearchConfig setMaxHttpConnections(int maxHttpConnections)
{
this.maxHttpConnections = maxHttpConnections;
return this;
}

@NotNull
public int getMaxHttpConnections()
{
return maxHttpConnections;
}

@Config("elasticsearch.http-thread-count")
@ConfigDescription("Number of threads handling HTTP connections to Elasticsearch")
public ElasticsearchConfig setHttpThreadCount(int httpThreadCount)
{
this.httpThreadCount = httpThreadCount;
return this;
}

@NotNull
public int getHttpThreadCount()
{
return httpThreadCount;
}

public boolean isTlsEnabled()
{
return tlsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
Expand All @@ -62,10 +63,6 @@
public class ElasticsearchMetadata
implements ConnectorMetadata
{
private static final String ORIGINAL_NAME = "original-name";

public static final String SUPPORTS_PREDICATES = "supports-predicates";

private final ElasticsearchClient client;
private final String schemaName;

Expand Down Expand Up @@ -125,30 +122,68 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect

private ConnectorTableMetadata getTableMetadata(String schemaName, String tableName)
{
IndexMetadata metadata = client.getIndexMetadata(tableName);
InternalTableMetadata internalTableMetadata = makeInternalTableMetadata(schemaName, tableName);
return new ConnectorTableMetadata(new SchemaTableName(schemaName, tableName), internalTableMetadata.getColumnMetadata());
}

return new ConnectorTableMetadata(
new SchemaTableName(schemaName, tableName),
toColumnMetadata(metadata));
private InternalTableMetadata makeInternalTableMetadata(ConnectorTableHandle table)
{
ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table;
return makeInternalTableMetadata(handle.getSchema(), handle.getIndex());
}

private List<ColumnMetadata> toColumnMetadata(IndexMetadata metadata)
private InternalTableMetadata makeInternalTableMetadata(String schema, String tableName)
{
ImmutableList.Builder<ColumnMetadata> result = ImmutableList.builder();
IndexMetadata metadata = client.getIndexMetadata(tableName);
List<IndexMetadata.Field> fields = getColumnFields(metadata);
return new InternalTableMetadata(new SchemaTableName(schema, tableName), makeColumnMetadata(fields), makeColumnHandles(fields));
}

result.add(BuiltinColumns.ID.getMetadata());
result.add(BuiltinColumns.SOURCE.getMetadata());
result.add(BuiltinColumns.SCORE.getMetadata());
private List<IndexMetadata.Field> getColumnFields(IndexMetadata metadata)
{
ImmutableList.Builder<IndexMetadata.Field> result = ImmutableList.builder();

Map<String, Long> counts = metadata.getSchema()
.getFields().stream()
.collect(Collectors.groupingBy(f -> f.getName().toLowerCase(ENGLISH), Collectors.counting()));

for (IndexMetadata.Field field : metadata.getSchema().getFields()) {
Type type = toPrestoType(field);
if (type == null) {
if (type == null || counts.get(field.getName().toLowerCase(ENGLISH)) > 1) {
continue;
}
result.add(field);
}
return result.build();
}

private List<ColumnMetadata> makeColumnMetadata(List<IndexMetadata.Field> fields)
{
ImmutableList.Builder<ColumnMetadata> result = ImmutableList.builder();

result.add(makeColumnMetadata(field.getName(), type, supportsPredicates(field.getType())));
for (BuiltinColumns builtinColumn : BuiltinColumns.values()) {
result.add(builtinColumn.getMetadata());
}

for (IndexMetadata.Field field : fields) {
result.add(new ColumnMetadata(field.getName(), toPrestoType(field)));
}
return result.build();
}

private Map<String, ColumnHandle> makeColumnHandles(List<IndexMetadata.Field> fields)
{
ImmutableMap.Builder<String, ColumnHandle> result = ImmutableMap.builder();
for (BuiltinColumns builtinColumn : BuiltinColumns.values()) {
result.put(builtinColumn.getName(), builtinColumn.getColumnHandle());
}

for (IndexMetadata.Field field : fields) {
result.put(field.getName(), new ElasticsearchColumnHandle(
field.getName(),
toPrestoType(field),
supportsPredicates(field.getType())));
}
return result.build();
}

Expand Down Expand Up @@ -253,24 +288,15 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
ImmutableMap.Builder<String, ColumnHandle> results = ImmutableMap.builder();

ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableHandle);
for (ColumnMetadata column : tableMetadata.getColumns()) {
results.put(column.getName(), new ElasticsearchColumnHandle(
(String) column.getProperties().getOrDefault(ORIGINAL_NAME, column.getName()),
column.getType(),
(Boolean) column.getProperties().get(SUPPORTS_PREDICATES)));
}

return results.build();
InternalTableMetadata tableMetadata = makeInternalTableMetadata(tableHandle);
return tableMetadata.getColumnHandles();
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
ElasticsearchColumnHandle handle = (ElasticsearchColumnHandle) columnHandle;
return makeColumnMetadata(handle.getName(), handle.getType(), handle.isSupportsPredicates());
return new ColumnMetadata(handle.getName(), handle.getType());
}

@Override
Expand All @@ -290,16 +316,35 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
.collect(toImmutableMap(ConnectorTableMetadata::getTable, ConnectorTableMetadata::getColumns));
}

private static ColumnMetadata makeColumnMetadata(String name, Type type, boolean supportsPredicates)
private static class InternalTableMetadata
{
return new ColumnMetadata(
name,
type,
null,
null,
false,
ImmutableMap.of(
ORIGINAL_NAME, name,
SUPPORTS_PREDICATES, supportsPredicates));
private final SchemaTableName tableName;
private final List<ColumnMetadata> columnMetadata;
private final Map<String, ColumnHandle> columnHandles;

public InternalTableMetadata(
SchemaTableName tableName,
List<ColumnMetadata> columnMetadata,
Map<String, ColumnHandle> columnHandles)
{
this.tableName = tableName;
this.columnMetadata = columnMetadata;
this.columnHandles = columnHandles;
}

public SchemaTableName getTableName()
{
return tableName;
}

public List<ColumnMetadata> getColumnMetadata()
{
return columnMetadata;
}

public Map<String, ColumnHandle> getColumnHandles()
{
return columnHandles;
}
}
}
Loading