diff --git a/presto-elasticsearch/pom.xml b/presto-elasticsearch/pom.xml
index c7b135d82e4b3..374c84db49f82 100644
--- a/presto-elasticsearch/pom.xml
+++ b/presto-elasticsearch/pom.xml
@@ -101,12 +101,6 @@
-
- org.elasticsearch
- elasticsearch
- ${dep.elasticsearch.version}
-
-
org.elasticsearch.client
elasticsearch-rest-high-level-client
@@ -249,6 +243,53 @@
test
+
+ net.java.dev.jna
+ jna
+ 5.5.0
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ 1.15.2
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+
+
+
+ org.testcontainers
+ elasticsearch
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.commons
+ commons-compress
+
+
+
+
+
+ com.facebook.presto
+ presto-main
+ test-jar
+ test
+
+
com.facebook.presto
presto-client
@@ -327,6 +368,18 @@
+
+
+ org.elasticsearch
+ elasticsearch
+ ${dep.elasticsearch.version}
+
+
+ org.elasticsearch
+ jna
+
+
+
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java
index 4822bd7b6756f..43cfd6ba33d8a 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConfig.java
@@ -32,7 +32,8 @@ public class ElasticsearchConfig
{
public enum Security
{
- AWS
+ AWS,
+ PASSWORD
}
private String host;
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java
index 09762c6aad7d9..4cd973f38793e 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java
@@ -30,6 +30,7 @@
import static com.facebook.airlift.json.JsonBinder.jsonBinder;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.elasticsearch.ElasticsearchConfig.Security.AWS;
+import static com.facebook.presto.elasticsearch.ElasticsearchConfig.Security.PASSWORD;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static java.util.Objects.requireNonNull;
@@ -55,6 +56,7 @@ protected void setup(Binder binder)
binder.install(new DecoderModule());
newOptionalBinder(binder, AwsSecurityConfig.class);
+ newOptionalBinder(binder, PasswordConfig.class);
install(installModuleIf(
ElasticsearchConfig.class,
@@ -62,6 +64,13 @@ protected void setup(Binder binder)
.filter(isEqual(AWS))
.isPresent(),
conditionalBinder -> configBinder(conditionalBinder).bindConfig(AwsSecurityConfig.class)));
+
+ install(installModuleIf(
+ ElasticsearchConfig.class,
+ config -> config.getSecurity()
+ .filter(isEqual(PASSWORD))
+ .isPresent(),
+ conditionalBinder -> configBinder(conditionalBinder).bindConfig(PasswordConfig.class)));
}
private static final class TypeDeserializer
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/PasswordConfig.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/PasswordConfig.java
new file mode 100644
index 0000000000000..d2def0d8a3e40
--- /dev/null
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/PasswordConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.elasticsearch;
+
+import com.facebook.airlift.configuration.Config;
+import com.facebook.airlift.configuration.ConfigSecuritySensitive;
+
+import javax.validation.constraints.NotNull;
+
+public class PasswordConfig
+{
+ private String user;
+ private String password;
+
+ @NotNull
+ public String getUser()
+ {
+ return user;
+ }
+
+ @Config("elasticsearch.auth.user")
+ public PasswordConfig setUser(String user)
+ {
+ this.user = user;
+ return this;
+ }
+
+ @NotNull
+ public String getPassword()
+ {
+ return password;
+ }
+
+ @Config("elasticsearch.auth.password")
+ @ConfigSecuritySensitive
+ public PasswordConfig setPassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+}
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java
index abf46d151d820..55caaf8fef42b 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/client/ElasticsearchClient.java
@@ -24,6 +24,7 @@
import com.facebook.airlift.security.pem.PemReader;
import com.facebook.presto.elasticsearch.AwsSecurityConfig;
import com.facebook.presto.elasticsearch.ElasticsearchConfig;
+import com.facebook.presto.elasticsearch.PasswordConfig;
import com.facebook.presto.spi.PrestoException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,10 +36,14 @@
import io.airlift.units.Duration;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
@@ -129,9 +134,12 @@ public class ElasticsearchClient
private final boolean ignorePublishAddress;
@Inject
- public ElasticsearchClient(ElasticsearchConfig config, Optional awsSecurityConfig)
+ public ElasticsearchClient(
+ ElasticsearchConfig config,
+ Optional awsSecurityConfig,
+ Optional passwordConfig)
{
- client = createClient(config, awsSecurityConfig);
+ client = createClient(config, awsSecurityConfig, passwordConfig);
this.ignorePublishAddress = config.isIgnorePublishAddress();
this.scrollSize = config.getScrollSize();
@@ -183,7 +191,10 @@ private void refreshNodes()
}
}
- private static RestHighLevelClient createClient(ElasticsearchConfig config, Optional awsSecurityConfig)
+ private static RestHighLevelClient createClient(
+ ElasticsearchConfig config,
+ Optional awsSecurityConfig,
+ Optional passwordConfig)
{
RestClientBuilder builder = RestClient.builder(
new HttpHost(config.getHost(), config.getPort(), config.isTlsEnabled() ? "https" : "http"))
@@ -216,6 +227,12 @@ private static RestHighLevelClient createClient(ElasticsearchConfig config, Opti
}
}
+ passwordConfig.ifPresent(securityConfig -> {
+ CredentialsProvider credentials = new BasicCredentialsProvider();
+ credentials.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(securityConfig.getUser(), securityConfig.getPassword()));
+ clientBuilder.setDefaultCredentialsProvider(credentials);
+ });
+
awsSecurityConfig.ifPresent(securityConfig -> clientBuilder.addInterceptorLast(new AwsRequestSigner(
securityConfig.getRegion(),
getAwsCredentialsProvider(securityConfig))));
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorTest.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorTest.java
index c718dcc31d879..6690cd0afc7fc 100644
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorTest.java
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorTest.java
@@ -13,25 +13,47 @@
*/
package com.facebook.presto.elasticsearch;
+import com.facebook.presto.testing.MaterializedResult;
+import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
import io.airlift.tpch.TpchTable;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.intellij.lang.annotations.Language;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
+import java.io.IOException;
+
+import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
-import static com.facebook.presto.elasticsearch.EmbeddedElasticsearchNode.createEmbeddedElasticsearchNode;
-import static org.elasticsearch.client.Requests.indexAliasesRequest;
-import static org.elasticsearch.client.Requests.refreshRequest;
+import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
+import static com.facebook.presto.testing.assertions.Assert.assertEquals;
+import static java.lang.String.format;
+@Test(singleThreaded = true)
public class ElasticsearchConnectorTest
extends AbstractTestIntegrationSmokeTest
{
- private EmbeddedElasticsearchNode embeddedElasticsearchNode;
+ private final String elasticsearchServer = "docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0";
+ private ElasticsearchServer elasticsearch;
+ private RestHighLevelClient client;
+
+ @AfterClass(alwaysRun = true)
+ public final void destroy()
+ throws IOException
+ {
+ elasticsearch.stop();
+ client.close();
+ }
@Test
public void testSelectInformationSchemaForMultiIndexAlias()
+ throws IOException
{
addAlias("nation", "multi_alias");
addAlias("region", "multi_alias");
@@ -132,25 +154,71 @@ public void testSelectInformationSchemaColumns()
protected QueryRunner createQueryRunner()
throws Exception
{
- embeddedElasticsearchNode = createEmbeddedElasticsearchNode();
- return createElasticsearchQueryRunner(embeddedElasticsearchNode, TpchTable.getTables());
+ elasticsearch = new ElasticsearchServer(elasticsearchServer, ImmutableMap.of());
+ HostAndPort address = elasticsearch.getAddress();
+ client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort())));
+
+ return createElasticsearchQueryRunner(elasticsearch.getAddress(),
+ TpchTable.getTables(),
+ ImmutableMap.of(),
+ ImmutableMap.of());
+ }
+
+ @Test
+ @Override
+ public void testDescribeTable()
+ {
+ MaterializedResult actualColumns = computeActual("DESC orders").toTestTypes();
+ MaterializedResult.Builder builder = resultBuilder(getQueryRunner().getDefaultSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR);
+ for (MaterializedRow row : actualColumns.getMaterializedRows()) {
+ builder.row(row.getField(0), row.getField(1), "", "");
+ }
+ MaterializedResult actualResult = builder.build();
+ builder = resultBuilder(getQueryRunner().getDefaultSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR);
+ MaterializedResult expectedColumns = builder
+ .row("clerk", "varchar", "", "")
+ .row("comment", "varchar", "", "")
+ .row("custkey", "bigint", "", "")
+ .row("orderdate", "timestamp", "", "")
+ .row("orderkey", "bigint", "", "")
+ .row("orderpriority", "varchar", "", "")
+ .row("orderstatus", "varchar", "", "")
+ .row("shippriority", "bigint", "", "")
+ .row("totalprice", "real", "", "")
+ .build();
+ assertEquals(actualResult, expectedColumns, format("%s != %s", actualResult, expectedColumns));
+ }
+
+ @Test
+ public void testMultipleRangesPredicate()
+ {
+ assertQuery("" +
+ "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
+ "FROM orders " +
+ "WHERE orderkey BETWEEN 10 AND 50 OR orderkey BETWEEN 100 AND 150");
+ }
+
+ @Test
+ public void testRangePredicate()
+ {
+ // List columns explicitly, as there's no defined order in Elasticsearch
+ assertQuery("" +
+ "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
+ "FROM orders " +
+ "WHERE orderkey BETWEEN 10 AND 50");
+ }
+
+ @Test
+ public void testSelectAll()
+ {
+ // List columns explicitly, as there's no defined order in Elasticsearch
+ assertQuery("SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment FROM orders");
}
private void addAlias(String index, String alias)
+ throws IOException
{
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .aliases(indexAliasesRequest()
- .addAliasAction(IndicesAliasesRequest.AliasActions.add()
- .index(index)
- .alias(alias)))
- .actionGet();
-
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(alias))
- .actionGet();
+ client.getLowLevelClient()
+ .performRequest("PUT", format("/%s/_alias/%s", index, alias));
}
}
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchLoader.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchLoader.java
index 4fd9aea3be022..80545c6488a9b 100644
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchLoader.java
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchLoader.java
@@ -24,7 +24,7 @@
import com.facebook.presto.tests.ResultsSession;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
@@ -42,17 +42,17 @@
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
-import static org.elasticsearch.client.Requests.flushRequest;
+import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class ElasticsearchLoader
extends AbstractTestingPrestoClient
{
private final String tableName;
- private final Client client;
+ RestHighLevelClient restClient;
public ElasticsearchLoader(
- Client client,
+ RestHighLevelClient client,
String tableName,
TestingPrestoServer prestoServer,
Session defaultSession)
@@ -60,7 +60,7 @@ public ElasticsearchLoader(
super(prestoServer, defaultSession);
this.tableName = requireNonNull(tableName, "tableName is null");
- this.client = requireNonNull(client, "client is null");
+ this.restClient = requireNonNull(client, "client is null");
}
@Override
@@ -108,8 +108,13 @@ public void addResults(QueryStatusInfo statusInfo, QueryData data)
throw new UncheckedIOException("Error loading data into Elasticsearch index: " + tableName, e);
}
}
- client.bulk(request).actionGet();
- client.admin().indices().flush(flushRequest(tableName)).actionGet();
+ request.setRefreshPolicy(IMMEDIATE);
+ try {
+ restClient.bulk(request);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java
index fceeb1d74f545..dfb273c52eb1f 100644
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java
@@ -22,7 +22,11 @@
import com.facebook.presto.tests.TestingPrestoClient;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
import io.airlift.tpch.TpchTable;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import java.util.Map;
@@ -42,63 +46,77 @@ private ElasticsearchQueryRunner() {}
private static final String TPCH_SCHEMA = "tpch";
private static final int NODE_COUNT = 2;
- public static DistributedQueryRunner createElasticsearchQueryRunner(EmbeddedElasticsearchNode embeddedElasticsearchNode, Iterable> tables)
+ public static DistributedQueryRunner createElasticsearchQueryRunner(
+ HostAndPort address,
+ Iterable> tables,
+ Map extraProperties,
+ Map extraConnectorProperties)
throws Exception
{
+ RestHighLevelClient client = null;
DistributedQueryRunner queryRunner = null;
try {
queryRunner = DistributedQueryRunner.builder(createSession())
.setNodeCount(NODE_COUNT)
+ .setExtraProperties(extraProperties)
.build();
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
- embeddedElasticsearchNode.start();
-
TestingElasticsearchConnectorFactory testFactory = new TestingElasticsearchConnectorFactory();
- installElasticsearchPlugin(queryRunner, testFactory);
+ installElasticsearchPlugin(address, queryRunner, testFactory, extraConnectorProperties);
TestingPrestoClient prestoClient = queryRunner.getRandomClient();
LOG.info("Loading data...");
+
+ client = new RestHighLevelClient(RestClient.builder(HttpHost.create(address.toString())));
+
long startTime = System.nanoTime();
for (TpchTable> table : tables) {
- loadTpchTopic(embeddedElasticsearchNode, prestoClient, table);
+ loadTpchTopic(client, prestoClient, table);
}
LOG.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));
return queryRunner;
}
catch (Exception e) {
- closeAllSuppress(e, queryRunner, embeddedElasticsearchNode);
+ closeAllSuppress(e, queryRunner, client);
throw e;
}
}
- private static void installElasticsearchPlugin(QueryRunner queryRunner, TestingElasticsearchConnectorFactory factory)
- throws Exception
+ private static void installElasticsearchPlugin(
+ HostAndPort address,
+ QueryRunner queryRunner,
+ TestingElasticsearchConnectorFactory factory,
+ Map extraConnectorProperties)
{
queryRunner.installPlugin(new ElasticsearchPlugin(factory));
Map config = ImmutableMap.builder()
- .put("elasticsearch.host", "localhost")
- .put("elasticsearch.port", "9200")
+ .put("elasticsearch.host", address.getHost())
+ .put("elasticsearch.port", Integer.toString(address.getPort()))
+ // Node discovery relies on the publish_address exposed via the Elasticseach API
+ // This doesn't work well within a docker environment that maps ES's port to a random public port
+ .put("elasticsearch.ignore-publish-address", "true")
.put("elasticsearch.default-schema-name", TPCH_SCHEMA)
.put("elasticsearch.scroll-size", "1000")
.put("elasticsearch.scroll-timeout", "1m")
.put("elasticsearch.max-hits", "1000000")
.put("elasticsearch.request-timeout", "2m")
+ .putAll(extraConnectorProperties)
.build();
queryRunner.createCatalog("elasticsearch", "elasticsearch", config);
}
- private static void loadTpchTopic(EmbeddedElasticsearchNode embeddedElasticsearchNode, TestingPrestoClient prestoClient, TpchTable> table)
+ private static void loadTpchTopic(RestHighLevelClient client, TestingPrestoClient prestoClient, TpchTable> table)
{
long start = System.nanoTime();
LOG.info("Running import for %s", table.getTableName());
- ElasticsearchLoader loader = new ElasticsearchLoader(embeddedElasticsearchNode.getClient(), table.getTableName().toLowerCase(ENGLISH), prestoClient.getServer(), prestoClient.getDefaultSession());
+ ElasticsearchLoader loader = new ElasticsearchLoader(client, table.getTableName().toLowerCase(ENGLISH), prestoClient.getServer(), prestoClient.getDefaultSession());
loader.execute(format("SELECT * from %s", new QualifiedObjectName(TPCH_SCHEMA, TINY_SCHEMA_NAME, table.getTableName().toLowerCase(ENGLISH))));
LOG.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}
@@ -111,9 +129,16 @@ public static Session createSession()
public static void main(String[] args)
throws Exception
{
+ // To start Elasticsearch:
+ // docker run -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.6.2
+
Logging.initialize();
- DistributedQueryRunner queryRunner = createElasticsearchQueryRunner(EmbeddedElasticsearchNode.createEmbeddedElasticsearchNode(), TpchTable.getTables());
- Thread.sleep(10);
+
+ DistributedQueryRunner queryRunner = createElasticsearchQueryRunner(
+ HostAndPort.fromParts("localhost", 9200),
+ TpchTable.getTables(),
+ ImmutableMap.of("http-server.http.port", "8080"),
+ ImmutableMap.of());
Logger log = Logger.get(ElasticsearchQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchServer.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchServer.java
new file mode 100644
index 0000000000000..d4b5dcae621e7
--- /dev/null
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchServer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.elasticsearch;
+
+import com.google.common.net.HostAndPort;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import static com.google.common.io.Files.createTempDir;
+import static com.google.common.io.MoreFiles.deleteRecursively;
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testcontainers.utility.MountableFile.forHostPath;
+
+public class ElasticsearchServer
+{
+ private final String containerPath = "/usr/share/elasticsearch/config/";
+ private final Path configurationPath;
+ private final ElasticsearchContainer container;
+
+ public ElasticsearchServer(String image, Map configurationFiles)
+ throws IOException
+ {
+ container = new ElasticsearchContainer(image);
+
+ configurationPath = createTempDir().toPath();
+ for (Map.Entry entry : configurationFiles.entrySet()) {
+ String name = entry.getKey();
+ byte[] contents = entry.getValue().getBytes(UTF_8);
+
+ Path path = configurationPath.resolve(name);
+ Files.write(path, contents);
+ container.withCopyFileToContainer(forHostPath(path), containerPath + name);
+ }
+
+ container.start();
+ }
+
+ public void stop()
+ throws IOException
+ {
+ container.close();
+ deleteRecursively(configurationPath, ALLOW_INSECURE);
+ }
+
+ public HostAndPort getAddress()
+ {
+ return HostAndPort.fromString(container.getHttpHostAddress());
+ }
+}
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/EmbeddedElasticsearchNode.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/EmbeddedElasticsearchNode.java
deleted file mode 100644
index 3be878e274cf2..0000000000000
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/EmbeddedElasticsearchNode.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.elasticsearch;
-
-import com.google.common.collect.ImmutableList;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.transport.Netty4Plugin;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.io.MoreFiles.deleteRecursively;
-import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
-
-public class EmbeddedElasticsearchNode
- implements Closeable
-{
- private final File elasticsearchDirectory;
- private final AtomicBoolean running = new AtomicBoolean();
-
- private ElasticsearchNode node;
-
- public static EmbeddedElasticsearchNode createEmbeddedElasticsearchNode()
- {
- return new EmbeddedElasticsearchNode();
- }
-
- EmbeddedElasticsearchNode()
- {
- try {
- elasticsearchDirectory = File.createTempFile("elasticsearch", "test");
- elasticsearchDirectory.delete();
- elasticsearchDirectory.mkdir();
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- Settings setting = Settings.builder()
- .put("cluster.name", "test")
- .put("path.home", elasticsearchDirectory.getPath())
- .put("path.data", new File(elasticsearchDirectory, "data").getAbsolutePath())
- .put("path.logs", new File(elasticsearchDirectory, "logs").getAbsolutePath())
- .put("transport.type.default", "local")
- .put("transport.type", "netty4")
- .put("http.type", "netty4")
- .put("http.enabled", "true")
- .put("path.home", "elasticsearch-test-data")
- .build();
- node = new ElasticsearchNode(setting, ImmutableList.of(Netty4Plugin.class));
- }
-
- public void start()
- throws Exception
- {
- if (running.compareAndSet(false, true)) {
- node.start();
- }
- }
-
- @Override
- public void close()
- throws IOException
- {
- if (running.compareAndSet(true, false)) {
- node = null;
- deleteRecursively(elasticsearchDirectory.toPath(), ALLOW_INSECURE);
- }
- }
-
- public Client getClient()
- {
- return node.client();
- }
-}
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java
index e225db68cb7af..fc15c6daf5d5e 100644
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestElasticsearchIntegrationSmokeTest.java
@@ -20,9 +20,15 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
+import com.google.common.net.HostAndPort;
import io.airlift.tpch.TpchTable;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.elasticsearch.common.xcontent.XContentType;
+import org.apache.http.HttpHost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -34,34 +40,41 @@
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
-import static com.facebook.presto.elasticsearch.EmbeddedElasticsearchNode.createEmbeddedElasticsearchNode;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.elasticsearch.client.Requests.indexAliasesRequest;
-import static org.elasticsearch.client.Requests.refreshRequest;
@Test(singleThreaded = true)
public class TestElasticsearchIntegrationSmokeTest
extends AbstractTestIntegrationSmokeTest
{
- private EmbeddedElasticsearchNode embeddedElasticsearchNode;
+ private final String elasticsearchServer = "docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0";
+ private ElasticsearchServer elasticsearch;
+ private RestHighLevelClient client;
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
- embeddedElasticsearchNode = createEmbeddedElasticsearchNode();
- return createElasticsearchQueryRunner(embeddedElasticsearchNode, TpchTable.getTables());
+ elasticsearch = new ElasticsearchServer(elasticsearchServer, ImmutableMap.of());
+
+ HostAndPort address = elasticsearch.getAddress();
+ client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort())));
+
+ return createElasticsearchQueryRunner(elasticsearch.getAddress(),
+ TpchTable.getTables(),
+ ImmutableMap.of(),
+ ImmutableMap.of());
}
@AfterClass(alwaysRun = true)
public final void destroy()
throws IOException
{
- embeddedElasticsearchNode.close();
+ elasticsearch.stop();
+ client.close();
}
@Test
@@ -143,92 +156,93 @@ public void testShowCreateTable()
@Test
public void testArrayFields()
+ throws IOException
{
String indexName = "test_arrays";
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc", "" +
- "{" +
- " \"_meta\": {" +
- " \"presto\": {" +
- " \"a\": {" +
- " \"b\": {" +
- " \"y\": {" +
- " \"isArray\": true" +
- " }" +
- " }" +
- " }," +
- " \"c\": {" +
- " \"f\": {" +
- " \"g\": {" +
- " \"isArray\": true" +
- " }," +
- " \"isArray\": true" +
- " }" +
- " }," +
- " \"j\": {" +
- " \"isArray\": true" +
- " }," +
- " \"k\": {" +
- " \"isArray\": true" +
- " }" +
- " }" +
- " }," +
- " \"properties\":{" +
- " \"a\": {" +
- " \"type\": \"object\"," +
- " \"properties\": {" +
- " \"b\": {" +
- " \"type\": \"object\"," +
- " \"properties\": {" +
- " \"x\": {" +
- " \"type\": \"integer\"" +
- " }," +
- " \"y\": {" +
- " \"type\": \"keyword\"" +
- " }" +
- " } " +
- " }" +
- " }" +
- " }," +
- " \"c\": {" +
- " \"type\": \"object\"," +
- " \"properties\": {" +
- " \"d\": {" +
- " \"type\": \"keyword\"" +
- " }," +
- " \"e\": {" +
- " \"type\": \"keyword\"" +
- " }," +
- " \"f\": {" +
- " \"type\": \"object\"," +
- " \"properties\": {" +
- " \"g\": {" +
- " \"type\": \"integer\"" +
- " }," +
- " \"h\": {" +
- " \"type\": \"integer\"" +
- " }" +
- " } " +
- " }" +
- " }" +
- " }," +
- " \"i\": {" +
- " \"type\": \"long\"" +
- " }," +
- " \"j\": {" +
- " \"type\": \"long\"" +
- " }," +
- " \"k\": {" +
- " \"type\": \"long\"" +
- " }" +
- " }" +
- "}",
- XContentType.JSON)
- .get();
+ String mapping = "" +
+ "{" +
+ " \"mappings\": {" +
+ " \"doc\": {" +
+ " \"_meta\": {" +
+ " \"presto\": {" +
+ " \"a\": {" +
+ " \"b\": {" +
+ " \"y\": {" +
+ " \"isArray\": true" +
+ " }" +
+ " }" +
+ " }," +
+ " \"c\": {" +
+ " \"f\": {" +
+ " \"g\": {" +
+ " \"isArray\": true" +
+ " }," +
+ " \"isArray\": true" +
+ " }" +
+ " }," +
+ " \"j\": {" +
+ " \"isArray\": true" +
+ " }," +
+ " \"k\": {" +
+ " \"isArray\": true" +
+ " }" +
+ " }" +
+ " }," +
+ " \"properties\":{" +
+ " \"a\": {" +
+ " \"type\": \"object\"," +
+ " \"properties\": {" +
+ " \"b\": {" +
+ " \"type\": \"object\"," +
+ " \"properties\": {" +
+ " \"x\": {" +
+ " \"type\": \"integer\"" +
+ " }," +
+ " \"y\": {" +
+ " \"type\": \"keyword\"" +
+ " }" +
+ " } " +
+ " }" +
+ " }" +
+ " }," +
+ " \"c\": {" +
+ " \"type\": \"object\"," +
+ " \"properties\": {" +
+ " \"d\": {" +
+ " \"type\": \"keyword\"" +
+ " }," +
+ " \"e\": {" +
+ " \"type\": \"keyword\"" +
+ " }," +
+ " \"f\": {" +
+ " \"type\": \"object\"," +
+ " \"properties\": {" +
+ " \"g\": {" +
+ " \"type\": \"integer\"" +
+ " }," +
+ " \"h\": {" +
+ " \"type\": \"integer\"" +
+ " }" +
+ " } " +
+ " }" +
+ " }" +
+ " }," +
+ " \"i\": {" +
+ " \"type\": \"long\"" +
+ " }," +
+ " \"j\": {" +
+ " \"type\": \"long\"" +
+ " }," +
+ " \"k\": {" +
+ " \"type\": \"long\"" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ "}";
+
+ createIndex(indexName, mapping);
index(indexName, ImmutableMap.builder()
.put("a", ImmutableMap.builder()
@@ -266,12 +280,6 @@ public void testArrayFields()
.build())
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
assertQuery(
"SELECT a.b.y[1], c.f[1].g[2], c.f[2].g[1], j[2], k[1] FROM test_arrays",
"VALUES ('hello', 20, 30, 60, NULL)");
@@ -279,6 +287,7 @@ public void testArrayFields()
@Test
public void testEmptyObjectFields()
+ throws IOException
{
String indexName = "emptyobject";
index(indexName, ImmutableMap.builder()
@@ -288,100 +297,14 @@ public void testEmptyObjectFields()
.put("fields.fieldb", ImmutableMap.of())
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
assertQuery(
"SELECT name, fields.fielda FROM emptyobject",
"VALUES ('stringfield', 32)");
}
- @Test
- public void testNullPredicate()
- {
- String indexName = "null_predicate1";
-
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc",
- "null_keyword", "type=keyword",
- "custkey", "type=keyword")
- .get();
- index(indexName, ImmutableMap.builder()
- .put("null_keyword", 32)
- .put("custkey", 1301)
- .build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
- assertQueryReturnsEmptyResult("SELECT * FROM null_predicate1 WHERE null_keyword IS NULL");
- assertQueryReturnsEmptyResult("SELECT * FROM null_predicate1 WHERE null_keyword = '10' OR null_keyword IS NULL");
-
- assertQuery("SELECT custkey, null_keyword FROM null_predicate1 WHERE null_keyword = '32' OR null_keyword IS NULL", "VALUES (1301, 32)");
- assertQuery("SELECT custkey FROM null_predicate1 WHERE null_keyword = '32' OR null_keyword IS NULL", "VALUES (1301)");
-
- // not null filter
- // filtered column is selected
- assertQuery("SELECT custkey, null_keyword FROM null_predicate1 WHERE null_keyword IS NOT NULL", "VALUES (1301, 32)");
- assertQuery("SELECT custkey, null_keyword FROM null_predicate1 WHERE null_keyword = '32' OR null_keyword IS NOT NULL", "VALUES (1301, 32)");
-
- // filtered column is not selected
- assertQuery("SELECT custkey FROM null_predicate1 WHERE null_keyword = '32' OR null_keyword IS NOT NULL", "VALUES (1301)");
-
- indexName = "null_predicate2";
-
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc",
- "null_keyword", "type=keyword",
- "custkey", "type=keyword")
- .get();
- index(indexName, ImmutableMap.builder()
- .put("custkey", 1301)
- .build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
- // not null filter
- assertQueryReturnsEmptyResult("SELECT * FROM null_predicate2 WHERE null_keyword IS NOT NULL");
- assertQueryReturnsEmptyResult("SELECT * FROM null_predicate2 WHERE null_keyword = '10' OR null_keyword IS NOT NULL");
-
- // filtered column is selected
- assertQuery("SELECT custkey, null_keyword FROM null_predicate2 WHERE null_keyword IS NULL", "VALUES (1301, NULL)");
- assertQuery("SELECT custkey, null_keyword FROM null_predicate2 WHERE null_keyword = '32' OR null_keyword IS NULL", "VALUES (1301, NULL)");
-
- // filtered column is not selected
- assertQuery("SELECT custkey FROM null_predicate2 WHERE null_keyword = '32' OR null_keyword IS NULL", "VALUES (1301)");
-
- index(indexName, ImmutableMap.builder()
- .put("null_keyword", 32)
- .put("custkey", 1302)
- .build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
- assertQuery("SELECT custkey, null_keyword FROM null_predicate2 WHERE null_keyword = '32' OR null_keyword IS NULL", "VALUES (1301, NULL), (1302, 32)");
- assertQuery("SELECT custkey FROM null_predicate2 WHERE null_keyword = '32' OR null_keyword IS NULL", "VALUES (1301), (1302)");
- }
-
@Test
public void testNestedFields()
+ throws IOException
{
String indexName = "data";
index(indexName, ImmutableMap.builder()
@@ -390,12 +313,6 @@ public void testNestedFields()
.put("fields.fieldb", "valueb")
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
assertQuery(
"SELECT name, fields.fielda, fields.fieldb FROM data",
"VALUES ('nestfield', 32, 'valueb')");
@@ -403,6 +320,7 @@ public void testNestedFields()
@Test
public void testNestedVariants()
+ throws IOException
{
String indexName = "nested_variants";
@@ -425,12 +343,6 @@ public void testNestedVariants()
index(indexName,
ImmutableMap.of("a.b.c", "value4"));
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
assertQuery(
"SELECT a.b.c FROM nested_variants",
"VALUES 'value1', 'value2', 'value3', 'value4'");
@@ -438,26 +350,30 @@ public void testNestedVariants()
@Test
public void testDataTypes()
+ throws IOException
{
String indexName = "types";
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc",
- "boolean_column", "type=boolean",
- "float_column", "type=float",
- "double_column", "type=double",
- "integer_column", "type=integer",
- "long_column", "type=long",
- "keyword_column", "type=keyword",
- "text_column", "type=text",
- "binary_column", "type=binary",
- "timestamp_column", "type=date",
- "ipv4_column", "type=ip",
- "ipv6_column", "type=ip")
- .get();
+ String mapping = "" +
+ "{" +
+ " \"mappings\": {" +
+ " \"doc\": {" +
+ " \"properties\": {" +
+ " \"boolean_column\": { \"type\": \"boolean\" }," +
+ " \"float_column\": { \"type\": \"float\" }," +
+ " \"double_column\": { \"type\": \"double\" }," +
+ " \"integer_column\": { \"type\": \"integer\" }," +
+ " \"long_column\": { \"type\": \"long\" }," +
+ " \"keyword_column\": { \"type\": \"keyword\" }," +
+ " \"text_column\": { \"type\": \"text\" }," +
+ " \"binary_column\": { \"type\": \"binary\" }," +
+ " \"timestamp_column\": { \"type\": \"date\" }" +
+ " }" +
+ " }" +
+ " }" +
+ "}";
+
+ createIndex(indexName, mapping);
index(indexName, ImmutableMap.builder()
.put("boolean_column", true)
@@ -473,11 +389,6 @@ public void testDataTypes()
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
MaterializedResult rows = computeActual("" +
"SELECT " +
"boolean_column, " +
@@ -503,28 +414,30 @@ public void testDataTypes()
@Test
public void testFilters()
+ throws IOException
{
String indexName = "filter_pushdown";
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc",
- "boolean_column", "type=boolean",
- "byte_column", "type=byte",
- "short_column", "type=short",
- "integer_column", "type=integer",
- "long_column", "type=long",
- "float_column", "type=float",
- "double_column", "type=double",
- "keyword_column", "type=keyword",
- "text_column", "type=text",
- "binary_column", "type=binary",
- "timestamp_column", "type=date",
- "ipv4_column", "type=ip",
- "ipv6_column", "type=ip")
- .get();
+ String mapping = "" +
+ "{" +
+ " \"mappings\": {" +
+ " \"doc\": {" +
+ " \"properties\": {" +
+ " \"boolean_column\": { \"type\": \"boolean\" }," +
+ " \"float_column\": { \"type\": \"float\" }," +
+ " \"double_column\": { \"type\": \"double\" }," +
+ " \"integer_column\": { \"type\": \"integer\" }," +
+ " \"long_column\": { \"type\": \"long\" }," +
+ " \"keyword_column\": { \"type\": \"keyword\" }," +
+ " \"text_column\": { \"type\": \"text\" }," +
+ " \"binary_column\": { \"type\": \"binary\" }," +
+ " \"timestamp_column\": { \"type\": \"date\" }" +
+ " }" +
+ " }" +
+ " }" +
+ "}";
+
+ createIndex(indexName, mapping);
index(indexName, ImmutableMap.builder()
.put("boolean_column", true)
@@ -542,12 +455,6 @@ public void testFilters()
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
// boolean
assertQuery("SELECT count(*) FROM filter_pushdown WHERE boolean_column = true", "VALUES 1");
assertQuery("SELECT count(*) FROM filter_pushdown WHERE boolean_column = false", "VALUES 0");
@@ -620,35 +527,34 @@ public void testFilters()
@Test
public void testDataTypesNested()
+ throws IOException
{
String indexName = "types_nested";
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc", "" +
- "{ " +
- " \"properties\": {\n" +
- " \"field\": {\n" +
- " \"properties\": {\n" +
- " \"boolean_column\": { \"type\": \"boolean\" },\n" +
- " \"float_column\": { \"type\": \"float\" },\n" +
- " \"double_column\": { \"type\": \"double\" },\n" +
- " \"integer_column\": { \"type\": \"integer\" },\n" +
- " \"long_column\": { \"type\": \"long\" },\n" +
- " \"keyword_column\": { \"type\": \"keyword\" },\n" +
- " \"text_column\": { \"type\": \"text\" },\n" +
- " \"binary_column\": { \"type\": \"binary\" },\n" +
- " \"timestamp_column\": { \"type\": \"date\" },\n" +
- " \"ipv4_column\": { \"type\": \"ip\" },\n" +
- " \"ipv6_column\": { \"type\": \"ip\" }\n" +
- " }\n" +
- " }\n" +
- " }" +
- "}\n",
- XContentType.JSON)
- .get();
+ String mapping = "" +
+ "{" +
+ " \"mappings\": {" +
+ " \"doc\": {" +
+ " \"properties\": {" +
+ " \"field\": {" +
+ " \"properties\": {" +
+ " \"boolean_column\": { \"type\": \"boolean\" }," +
+ " \"float_column\": { \"type\": \"float\" }," +
+ " \"double_column\": { \"type\": \"double\" }," +
+ " \"integer_column\": { \"type\": \"integer\" }," +
+ " \"long_column\": { \"type\": \"long\" }," +
+ " \"keyword_column\": { \"type\": \"keyword\" }," +
+ " \"text_column\": { \"type\": \"text\" }," +
+ " \"binary_column\": { \"type\": \"binary\" }," +
+ " \"timestamp_column\": { \"type\": \"date\" }" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ "}";
+
+ createIndex(indexName, mapping);
index(indexName, ImmutableMap.of(
"field",
@@ -666,12 +572,6 @@ public void testDataTypesNested()
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build()));
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
MaterializedResult rows = computeActual("" +
"SELECT " +
"field.boolean_column, " +
@@ -697,36 +597,37 @@ public void testDataTypesNested()
@Test
public void testNestedTypeDataTypesNested()
+ throws IOException
{
String indexName = "nested_type_nested";
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc", "" +
- "{ " +
- " \"properties\": {\n" +
- " \"nested_field\": {" +
- " \"type\":\"nested\"," +
- " \"properties\": {" +
- " \"boolean_column\": { \"type\": \"boolean\" },\n" +
- " \"float_column\": { \"type\": \"float\" },\n" +
- " \"double_column\": { \"type\": \"double\" },\n" +
- " \"integer_column\": { \"type\": \"integer\" }\n," +
- " \"long_column\": { \"type\": \"long\" },\n" +
- " \"keyword_column\": { \"type\": \"keyword\" },\n" +
- " \"text_column\": { \"type\": \"text\" },\n" +
- " \"binary_column\": { \"type\": \"binary\" },\n" +
- " \"timestamp_column\": { \"type\": \"date\" },\n" +
- " \"ipv4_column\": { \"type\": \"ip\" },\n" +
- " \"ipv6_column\": { \"type\": \"ip\" }\n" +
- " }\n" +
- " }\n" +
- " }" +
- "}\n",
- XContentType.JSON)
- .get();
+ String mapping = "" +
+ "{" +
+ " \"mappings\": {" +
+ " \"doc\": {" +
+ " \"properties\": {" +
+ " \"nested_field\": {" +
+ " \"type\":\"nested\"," +
+ " \"properties\": {" +
+ " \"boolean_column\": { \"type\": \"boolean\" }," +
+ " \"float_column\": { \"type\": \"float\" }," +
+ " \"double_column\": { \"type\": \"double\" }," +
+ " \"integer_column\": { \"type\": \"integer\" }," +
+ " \"long_column\": { \"type\": \"long\" }," +
+ " \"keyword_column\": { \"type\": \"keyword\" }," +
+ " \"text_column\": { \"type\": \"text\" }," +
+ " \"binary_column\": { \"type\": \"binary\" }," +
+ " \"timestamp_column\": { \"type\": \"date\" }," +
+ " \"ipv4_column\": { \"type\": \"ip\" }," +
+ " \"ipv6_column\": { \"type\": \"ip\" }" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ " }" +
+ "}";
+
+ createIndex(indexName, mapping);
index(indexName, ImmutableMap.of(
"nested_field",
@@ -744,12 +645,6 @@ public void testNestedTypeDataTypesNested()
.put("ipv6_column", "2001:db8:0:1:1:1:1:1")
.build()));
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
MaterializedResult rows = computeActual("" +
"SELECT " +
"nested_field.boolean_column, " +
@@ -787,6 +682,7 @@ public void testQueryString()
@Test
public void testMixedCase()
+ throws IOException
{
String indexName = "mixed_case";
index(indexName, ImmutableMap.builder()
@@ -794,12 +690,6 @@ public void testMixedCase()
.put("AGE", 32)
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
assertQuery(
"SELECT name, age FROM mixed_case",
"VALUES ('john', 32)");
@@ -814,29 +704,16 @@ public void testNumericKeyword()
throws IOException
{
String indexName = "numeric_keyword";
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .prepareCreate(indexName)
- .addMapping("doc",
- "numeric_column", "type=keyword")
- .get();
index(indexName, ImmutableMap.builder()
.put("numeric_column", 20)
.build());
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest(indexName))
- .actionGet();
-
assertQuery(
"SELECT numeric_column FROM numeric_keyword",
"VALUES 20");
assertQuery(
- "SELECT numeric_column FROM numeric_keyword where numeric_column = '20'",
+ "SELECT numeric_column FROM numeric_keyword where numeric_column = 20",
"VALUES 20");
}
@@ -849,74 +726,35 @@ public void testQueryStringError()
@Test
public void testAlias()
+ throws IOException
{
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .aliases(indexAliasesRequest()
- .addAliasAction(IndicesAliasesRequest.AliasActions.add()
- .index("orders")
- .alias("orders_alias")))
- .actionGet();
-
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest("orders_alias"))
- .actionGet();
+ addAlias("orders", "orders_alias");
assertQuery(
"SELECT count(*) FROM orders_alias",
"SELECT count(*) FROM orders");
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .aliases(indexAliasesRequest()
- .addAliasAction(IndicesAliasesRequest.AliasActions.remove()
- .index("orders")
- .alias("orders_alias")))
- .actionGet();
+ removeAlias("orders", "orders_alias");
}
@Test(enabled = false)
public void testMultiIndexAlias()
+ throws IOException
{
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .aliases(indexAliasesRequest()
- .addAliasAction(IndicesAliasesRequest.AliasActions.add()
- .index("nation")
- .alias("multi_alias")))
- .actionGet();
-
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .aliases(indexAliasesRequest()
- .addAliasAction(IndicesAliasesRequest.AliasActions.add()
- .index("region")
- .alias("multi_alias")))
- .actionGet();
-
- embeddedElasticsearchNode.getClient()
- .admin()
- .indices()
- .refresh(refreshRequest("multi_alias"))
- .actionGet();
+ addAlias("nation", "multi_alias");
+ addAlias("region", "multi_alias");
assertQuery(
"SELECT count(*) FROM multi_alias",
"SELECT (SELECT count(*) FROM region) + (SELECT count(*) FROM nation)");
}
- private void index(String indexName, Map document)
+ private void index(String index, Map document)
+ throws IOException
{
- embeddedElasticsearchNode.getClient()
- .prepareIndex(indexName, "doc")
- .setSource(document)
- .get();
+ client.index(new IndexRequest(index, "doc")
+ .source(document)
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE));
}
@Test
@@ -946,4 +784,25 @@ public void testPassthroughQuery()
format("SELECT * FROM \"orders$query:%s\"", BaseEncoding.base32().encode("invalid json".getBytes(UTF_8))),
"Elasticsearch query for 'orders' is not valid JSON");
}
+
+ private void addAlias(String index, String alias)
+ throws IOException
+ {
+ client.getLowLevelClient()
+ .performRequest("PUT", format("/%s/_alias/%s", index, alias));
+ }
+
+ private void removeAlias(String index, String alias)
+ throws IOException
+ {
+ client.getLowLevelClient()
+ .performRequest("DELETE", format("/%s/_alias/%s", index, alias));
+ }
+
+ private void createIndex(String indexName, @Language("JSON") String mapping)
+ throws IOException
+ {
+ client.getLowLevelClient()
+ .performRequest("PUT", "/" + indexName, ImmutableMap.of(), new NStringEntity(mapping, ContentType.APPLICATION_JSON));
+ }
}
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestPasswordAuthentication.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestPasswordAuthentication.java
new file mode 100644
index 0000000000000..f13c60aaa3d7e
--- /dev/null
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestPasswordAuthentication.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.elasticsearch;
+
+import com.amazonaws.util.Base64;
+import com.facebook.presto.sql.query.QueryAssertions;
+import com.facebook.presto.tests.DistributedQueryRunner;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Resources;
+import com.google.common.net.HostAndPort;
+import org.apache.http.HttpHost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static com.facebook.presto.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
+import static com.google.common.io.Resources.getResource;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TestPasswordAuthentication
+{
+ // We use 7.8.0 because security became a non-commercial feature in recent versions
+ private final String elasticsearchImage = "elasticsearch:7.8.0";
+ private static final String USER = "elastic_user";
+ private static final String PASSWORD = "123456";
+
+ private final ElasticsearchServer elasticsearch;
+ private final RestHighLevelClient client;
+ private final QueryAssertions assertions;
+
+ public TestPasswordAuthentication()
+ throws Exception
+ {
+ elasticsearch = new ElasticsearchServer(elasticsearchImage, ImmutableMap.builder()
+ .put("elasticsearch.yml", loadResource("elasticsearch.yml"))
+ .put("users", loadResource("users"))
+ .put("users_roles", loadResource("users_roles"))
+ .put("roles.yml", loadResource("roles.yml"))
+ .build());
+
+ HostAndPort address = elasticsearch.getAddress();
+ client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort())));
+
+ DistributedQueryRunner runner = createElasticsearchQueryRunner(
+ elasticsearch.getAddress(),
+ ImmutableList.of(),
+ ImmutableMap.of(),
+ ImmutableMap.builder()
+ .put("elasticsearch.security", "PASSWORD")
+ .put("elasticsearch.auth.user", USER)
+ .put("elasticsearch.auth.password", PASSWORD)
+ .build());
+
+ assertions = new QueryAssertions(runner);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public final void destroy()
+ throws IOException
+ {
+ assertions.close();
+ elasticsearch.stop();
+ client.close();
+ }
+
+ @Test
+ public void test()
+ throws IOException
+ {
+ String json = new ObjectMapper().writeValueAsString(ImmutableMap.builder()
+ .put("value", 42L)
+ .build());
+
+ client.getLowLevelClient()
+ .performRequest(
+ "POST",
+ "/test/_doc?refresh",
+ ImmutableMap.of(),
+ new NStringEntity(json, ContentType.APPLICATION_JSON),
+ new BasicHeader("Authorization", format("Basic %s", Base64.encodeAsString(format("%s:%s", USER, PASSWORD).getBytes(StandardCharsets.UTF_8)))));
+
+ assertions.assertQuery("SELECT * FROM test",
+ "VALUES BIGINT '42'");
+ }
+
+ private static String loadResource(String file)
+ throws IOException
+ {
+ return Resources.toString(getResource(file), UTF_8);
+ }
+}
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestPasswordConfig.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestPasswordConfig.java
new file mode 100644
index 0000000000000..e4fa4fccc5190
--- /dev/null
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/TestPasswordConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.elasticsearch;
+
+import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
+import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
+import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+
+public class TestPasswordConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ assertRecordedDefaults(recordDefaults(PasswordConfig.class)
+ .setUser(null)
+ .setPassword(null));
+ }
+
+ @Test
+ public void testExplicitPropertyMappings()
+ {
+ Map properties = new ImmutableMap.Builder()
+ .put("elasticsearch.auth.user", "user")
+ .put("elasticsearch.auth.password", "password")
+ .build();
+
+ PasswordConfig expected = new PasswordConfig()
+ .setUser("user")
+ .setPassword("password");
+
+ assertFullMapping(properties, expected);
+ }
+}
diff --git a/presto-elasticsearch/src/test/resources/elasticsearch.yml b/presto-elasticsearch/src/test/resources/elasticsearch.yml
new file mode 100644
index 0000000000000..dab862554754d
--- /dev/null
+++ b/presto-elasticsearch/src/test/resources/elasticsearch.yml
@@ -0,0 +1,4 @@
+cluster.name: "docker-cluster"
+network.host: 0.0.0.0
+
+xpack.security.enabled: true
\ No newline at end of file
diff --git a/presto-elasticsearch/src/test/resources/roles.yml b/presto-elasticsearch/src/test/resources/roles.yml
new file mode 100644
index 0000000000000..ee8750c9b0384
--- /dev/null
+++ b/presto-elasticsearch/src/test/resources/roles.yml
@@ -0,0 +1,6 @@
+admin:
+ cluster:
+ - all
+ indices:
+ - names: '*'
+ privileges: [ all ]
\ No newline at end of file
diff --git a/presto-elasticsearch/src/test/resources/users b/presto-elasticsearch/src/test/resources/users
new file mode 100644
index 0000000000000..6cdaed94971e3
--- /dev/null
+++ b/presto-elasticsearch/src/test/resources/users
@@ -0,0 +1 @@
+elastic_user:$2a$10$tbO62EbOfqMezJDBDWlxbuvIleeYeNlw30F5OgWMXzi1R8aXqnVni
\ No newline at end of file
diff --git a/presto-elasticsearch/src/test/resources/users_roles b/presto-elasticsearch/src/test/resources/users_roles
new file mode 100644
index 0000000000000..3e263d0cc4882
--- /dev/null
+++ b/presto-elasticsearch/src/test/resources/users_roles
@@ -0,0 +1 @@
+admin:elastic_user
\ No newline at end of file
diff --git a/presto-main/src/test/java/com/facebook/presto/sql/query/QueryAssertions.java b/presto-main/src/test/java/com/facebook/presto/sql/query/QueryAssertions.java
index 41d06b76bdc9a..8f765ed6010bf 100644
--- a/presto-main/src/test/java/com/facebook/presto/sql/query/QueryAssertions.java
+++ b/presto-main/src/test/java/com/facebook/presto/sql/query/QueryAssertions.java
@@ -33,10 +33,11 @@
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.base.Strings.nullToEmpty;
import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
-class QueryAssertions
+public class QueryAssertions
implements Closeable
{
protected QueryRunner runner;
@@ -60,7 +61,12 @@ public QueryAssertions(Map systemProperties)
public QueryAssertions(Session session)
{
- runner = new LocalQueryRunner(session);
+ this(new LocalQueryRunner(requireNonNull(session, "session is null")));
+ }
+
+ public QueryAssertions(QueryRunner runner)
+ {
+ this.runner = requireNonNull(runner, "runner is null");
}
public QueryRunner getQueryRunner()