Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ Configuration properties
- Disables using the address published by Elasticsearch to connect for
queries.
-
* - ``elasticsearch.legacy-pass-through-query.enabled``
- Enables legacy pass-through query
- false

TLS security
------------
Expand Down Expand Up @@ -379,6 +382,13 @@ Elasticsearch Trino Supports
Pass-through queries
--------------------

.. note::

This feature is deprecated and disabled by default. It's recommended to use
``raw_query`` :doc:`table function</functions/table>` instead.
To enable legacy pass-through query please use
``elasticsearch.legacy-pass-through-query-enabled`` configuration property.

The Elasticsearch connector allows you to embed any valid Elasticsearch query,
that uses the `Elasticsearch Query DSL
<https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_
Expand Down
1 change: 1 addition & 0 deletions plugin/trino-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.elasticsearch.version>6.8.23</dep.elasticsearch.version>
<air.test.parallel>instances</air.test.parallel>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public enum Security

private Security security;

private boolean legacyPassThroughQueryEnabled;

@NotNull
public List<String> getHosts()
{
Expand Down Expand Up @@ -358,4 +360,17 @@ public ElasticsearchConfig setSecurity(Security security)
this.security = security;
return this;
}

public boolean isLegacyPassThroughQueryEnabled()
{
return legacyPassThroughQueryEnabled;
}

@Config("elasticsearch.legacy-pass-through-query.enabled")
@ConfigDescription("Enable legacy Elasticsearch pass-through query")
public ElasticsearchConfig setLegacyPassThroughQueryEnabled(boolean legacyPassThroughQuery)
{
this.legacyPassThroughQueryEnabled = legacyPassThroughQuery;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static io.trino.plugin.elasticsearch.ElasticsearchTableHandle.Type.SCAN;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static io.trino.spi.StandardErrorCode.UNSUPPORTED_SUBQUERY;
import static io.trino.spi.expression.StandardFunctions.LIKE_FUNCTION_NAME;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -141,13 +142,15 @@ public class ElasticsearchMetadata
private final Type ipAddressType;
private final ElasticsearchClient client;
private final String schemaName;
private final boolean legacyPassThroughQueryEnabled;

@Inject
public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client, ElasticsearchConfig config)
{
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
this.client = requireNonNull(client, "client is null");
this.schemaName = config.getDefaultSchema();
this.legacyPassThroughQueryEnabled = config.isLegacyPassThroughQueryEnabled();
}

@Override
Expand All @@ -170,6 +173,11 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT
// TODO this query pass-through mechanism is deprecated in favor of the `raw_query` table function.
// it should be eventually removed: https://github.com/trinodb/trino/issues/13050
if (table.endsWith(PASSTHROUGH_QUERY_SUFFIX)) {
if (!this.legacyPassThroughQueryEnabled) {
throw new TrinoException(
UNSUPPORTED_SUBQUERY,
"Pass-through query not supported. Please turn it on explicitly using elasticsearch.legacy-pass-through-query.enabled feature toggle");
}
table = table.substring(0, table.length() - PASSTHROUGH_QUERY_SUFFIX.length());
byte[] decoded;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ public abstract class BaseElasticsearchConnectorTest
extends BaseConnectorTest
{
private final String image;
private final String catalogName;
private ElasticsearchServer elasticsearch;
protected RestHighLevelClient client;

BaseElasticsearchConnectorTest(String image)
BaseElasticsearchConnectorTest(String image, String catalogName)
{
this.image = image;
this.catalogName = catalogName;
}

@Override
Expand All @@ -76,7 +78,13 @@ protected QueryRunner createQueryRunner()
HostAndPort address = elasticsearch.getAddress();
client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort())));

return createElasticsearchQueryRunner(elasticsearch.getAddress(), TpchTable.getTables(), ImmutableMap.of(), ImmutableMap.of(), 3);
return createElasticsearchQueryRunner(
elasticsearch.getAddress(),
TpchTable.getTables(),
ImmutableMap.of(),
ImmutableMap.of("elasticsearch.legacy-pass-through-query.enabled", "true"),
3,
catalogName);
}

@Override
Expand Down Expand Up @@ -147,8 +155,8 @@ public void testWithoutBackpressure()
{
assertQuerySucceeds("SELECT * FROM orders");
// Check that JMX stats show no sign of backpressure
assertQueryReturnsEmptyResult("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*\" WHERE \"backpressurestats.alltime.count\" > 0");
assertQueryReturnsEmptyResult("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*\" WHERE \"backpressurestats.alltime.max\" > 0");
assertQueryReturnsEmptyResult(format("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*name=%s*\" WHERE \"backpressurestats.alltime.count\" > 0", catalogName));
assertQueryReturnsEmptyResult(format("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*name=%s*\" WHERE \"backpressurestats.alltime.max\" > 0", catalogName));
}

@Test
Expand Down Expand Up @@ -209,7 +217,7 @@ public void testSortItemsReflectedInExplain()
public void testShowCreateTable()
{
assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue())
.isEqualTo("CREATE TABLE elasticsearch.tpch.orders (\n" +
.isEqualTo(format("CREATE TABLE %s.tpch.orders (\n", catalogName) +
" clerk varchar,\n" +
" comment varchar,\n" +
" custkey bigint,\n" +
Expand Down Expand Up @@ -1813,7 +1821,7 @@ public void testPassthroughQuery()
"FROM data", BaseEncoding.base32().encode(query.getBytes(UTF_8)))))
.matches(format("WITH data(r) AS (" +
" SELECT CAST(json_parse(result) AS ROW(aggregations ROW(max_orderkey ROW(value BIGINT), sum_orderkey ROW(value BIGINT)))) " +
" FROM TABLE(elasticsearch.system.raw_query(" +
format(" FROM TABLE(%s.system.raw_query(", catalogName) +
" schema => 'tpch', " +
" index => 'orders', " +
" query => '%s'))) " +
Expand Down Expand Up @@ -1882,7 +1890,7 @@ public void testQueryTableFunction()
{
// select single record
assertQuery("SELECT json_query(result, 'lax $[0][0].hits.hits._source') " +
"FROM TABLE(elasticsearch.system.raw_query(" +
format("FROM TABLE(%s.system.raw_query(", catalogName) +
"schema => 'tpch', " +
"index => 'nation', " +
"query => '{\"query\": {\"match\": {\"name\": \"ALGERIA\"}}}')) t(result)",
Expand All @@ -1892,7 +1900,7 @@ public void testQueryTableFunction()
Session session = Session.builder(getSession())
.addPreparedStatement(
"my_query",
"SELECT json_query(result, 'lax $[0][0].hits.hits._source') FROM TABLE(elasticsearch.system.raw_query(schema => ?, index => ?, query => ?))")
format("SELECT json_query(result, 'lax $[0][0].hits.hits._source') FROM TABLE(%s.system.raw_query(schema => ?, index => ?, query => ?))", catalogName))
.build();
assertQuery(
session,
Expand All @@ -1901,23 +1909,23 @@ public void testQueryTableFunction()

// select multiple records by range. Use array wrapper to wrap multiple results
assertQuery("SELECT array_sort(CAST(json_parse(json_query(result, 'lax $[0][0].hits.hits._source.name' WITH ARRAY WRAPPER)) AS array(varchar))) " +
"FROM TABLE(elasticsearch.system.raw_query(" +
format("FROM TABLE(%s.system.raw_query(", catalogName) +
"schema => 'tpch', " +
"index => 'nation', " +
"query => '{\"query\": {\"range\": {\"nationkey\": {\"gte\": 0,\"lte\": 3}}}}')) t(result)",
"VALUES ARRAY['ALGERIA', 'ARGENTINA', 'BRAZIL', 'CANADA']");

// no matches
assertQuery("SELECT json_query(result, 'lax $[0][0].hits.hits') " +
"FROM TABLE(elasticsearch.system.raw_query(" +
format("FROM TABLE(%s.system.raw_query(", catalogName) +
"schema => 'tpch', " +
"index => 'nation', " +
"query => '{\"query\": {\"match\": {\"name\": \"UTOPIA\"}}}')) t(result)",
"VALUES '[]'");

// syntax error
assertThatThrownBy(() -> query("SELECT * " +
"FROM TABLE(elasticsearch.system.raw_query(" +
format("FROM TABLE(%s.system.raw_query(", catalogName) +
"schema => 'tpch', " +
"index => 'nation', " +
"query => 'wrong syntax')) t(result)"))
Expand All @@ -1928,7 +1936,7 @@ protected void assertTableDoesNotExist(String name)
{
assertQueryReturnsEmptyResult(format("SELECT * FROM information_schema.columns WHERE table_name = '%s'", name));
assertFalse(computeActual("SHOW TABLES").getOnlyColumnAsSet().contains(name));
assertQueryFails("SELECT * FROM " + name, ".*Table 'elasticsearch.tpch." + name + "' does not exist");
assertQueryFails("SELECT * FROM " + name, ".*Table '" + catalogName + ".tpch." + name + "' does not exist");
}

protected abstract String indexEndpoint(String index, String docId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.elasticsearch;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.IOException;

import static io.trino.plugin.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
import static io.trino.plugin.elasticsearch.ElasticsearchServer.ELASTICSEARCH_7_IMAGE;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

public class TestDisabledLegacyPassThroughQuery
extends AbstractTestQueryFramework
{
private ElasticsearchServer elasticsearch;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
elasticsearch = new ElasticsearchServer(ELASTICSEARCH_7_IMAGE, ImmutableMap.of());

return createElasticsearchQueryRunner(
elasticsearch.getAddress(),
TpchTable.getTables(),
ImmutableMap.of(),
ImmutableMap.of(),
1,
"elasticsearch-legacy-pass-through-query");
}

@AfterClass(alwaysRun = true)
public final void destroy()
throws IOException
{
elasticsearch.stop();
}

@Test
public void testDisabledPassthroughQuery()
{
@Language("JSON")
String query = "{\n" +
" \"size\": 0,\n" +
" \"aggs\" : {\n" +
" \"max_orderkey\" : { \"max\" : { \"field\" : \"orderkey\" } },\n" +
" \"sum_orderkey\" : { \"sum\" : { \"field\" : \"orderkey\" } }\n" +
" }\n" +
"}";

assertQueryFails(
format("WITH data(r) AS (" +
" SELECT CAST(json_parse(result) AS ROW(aggregations ROW(max_orderkey ROW(value BIGINT), sum_orderkey ROW(value BIGINT)))) " +
" FROM \"orders$query:%s\") " +
"SELECT r.aggregations.max_orderkey.value, r.aggregations.sum_orderkey.value " +
"FROM data", BaseEncoding.base32().encode(query.getBytes(UTF_8))),
"Pass-through query not supported\\. Please turn it on explicitly using elasticsearch\\.legacy-pass-through-query\\.enabled feature toggle");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TestElasticsearch6ConnectorTest
{
public TestElasticsearch6ConnectorTest()
{
super("docker.elastic.co/elasticsearch/elasticsearch-oss:6.6.0");
super("docker.elastic.co/elasticsearch/elasticsearch-oss:6.6.0", "elasticsearch6");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TestElasticsearch7ConnectorTest
{
public TestElasticsearch7ConnectorTest()
{
super(ELASTICSEARCH_7_IMAGE);
super(ELASTICSEARCH_7_IMAGE, "elasticsearch7");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void testDefaults()
.setTruststorePassword(null)
.setVerifyHostnames(true)
.setIgnorePublishAddress(false)
.setSecurity(null));
.setSecurity(null)
.setLegacyPassThroughQueryEnabled(false));
}

@Test
Expand Down Expand Up @@ -89,6 +90,7 @@ public void testExplicitPropertyMappings()
.put("elasticsearch.tls.verify-hostnames", "false")
.put("elasticsearch.ignore-publish-address", "true")
.put("elasticsearch.security", "AWS")
.put("elasticsearch.legacy-pass-through-query.enabled", "true")
.buildOrThrow();

ElasticsearchConfig expected = new ElasticsearchConfig()
Expand All @@ -112,7 +114,8 @@ public void testExplicitPropertyMappings()
.setTruststorePassword("truststore-password")
.setVerifyHostnames(false)
.setIgnorePublishAddress(true)
.setSecurity(AWS);
.setSecurity(AWS)
.setLegacyPassThroughQueryEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TestElasticsearchOpenSearchConnectorTest
public TestElasticsearchOpenSearchConnectorTest()
{
// 1.0.0 and 1.0.1 causes NotSslRecordException during the initialization
super("opensearchproject/opensearch:1.1.0");
super("opensearchproject/opensearch:1.1.0", "opensearch");
}

@Override
Expand Down