diff --git a/docs/src/main/sphinx/connector/elasticsearch.rst b/docs/src/main/sphinx/connector/elasticsearch.rst index e463525f9a26..ea1b2cf70984 100644 --- a/docs/src/main/sphinx/connector/elasticsearch.rst +++ b/docs/src/main/sphinx/connector/elasticsearch.rst @@ -83,6 +83,9 @@ Configuration properties - Disables using the address published by Elasticsearch to connect for queries. - + * - ``elasticsearch.legacy-pass-through-query.enabled`` + - Enables legacy pass-through query + - false TLS security ------------ @@ -379,6 +382,13 @@ Elasticsearch Trino Supports Pass-through queries -------------------- +.. note:: + + This feature is deprecated and disabled by default. It's recommended to use + ``raw_query`` :doc:`table function` instead. + To enable legacy pass-through query please use + ``elasticsearch.legacy-pass-through-query-enabled`` configuration property. + The Elasticsearch connector allows you to embed any valid Elasticsearch query, that uses the `Elasticsearch Query DSL `_ diff --git a/plugin/trino-elasticsearch/pom.xml b/plugin/trino-elasticsearch/pom.xml index fee54767d006..97cbd239ccbc 100644 --- a/plugin/trino-elasticsearch/pom.xml +++ b/plugin/trino-elasticsearch/pom.xml @@ -16,6 +16,7 @@ ${project.parent.basedir} 6.8.23 + instances diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java index ea3e7cb8f755..0a8ec8de2b43 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java @@ -79,6 +79,8 @@ public enum Security private Security security; + private boolean legacyPassThroughQueryEnabled; + @NotNull public List getHosts() { @@ -358,4 +360,17 @@ public ElasticsearchConfig setSecurity(Security security) this.security = security; return this; } + + public boolean isLegacyPassThroughQueryEnabled() + { + return legacyPassThroughQueryEnabled; + } + + @Config("elasticsearch.legacy-pass-through-query.enabled") + @ConfigDescription("Enable legacy Elasticsearch pass-through query") + public ElasticsearchConfig setLegacyPassThroughQueryEnabled(boolean legacyPassThroughQuery) + { + this.legacyPassThroughQueryEnabled = legacyPassThroughQuery; + return this; + } } diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java index c9ad304daac3..3f06ed51981b 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java @@ -95,6 +95,7 @@ import static io.trino.plugin.elasticsearch.ElasticsearchTableHandle.Type.SCAN; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; +import static io.trino.spi.StandardErrorCode.UNSUPPORTED_SUBQUERY; import static io.trino.spi.expression.StandardFunctions.LIKE_FUNCTION_NAME; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -141,6 +142,7 @@ public class ElasticsearchMetadata private final Type ipAddressType; private final ElasticsearchClient client; private final String schemaName; + private final boolean legacyPassThroughQueryEnabled; @Inject public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client, ElasticsearchConfig config) @@ -148,6 +150,7 @@ public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS)); this.client = requireNonNull(client, "client is null"); this.schemaName = config.getDefaultSchema(); + this.legacyPassThroughQueryEnabled = config.isLegacyPassThroughQueryEnabled(); } @Override @@ -170,6 +173,11 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT // TODO this query pass-through mechanism is deprecated in favor of the `raw_query` table function. // it should be eventually removed: https://github.com/trinodb/trino/issues/13050 if (table.endsWith(PASSTHROUGH_QUERY_SUFFIX)) { + if (!this.legacyPassThroughQueryEnabled) { + throw new TrinoException( + UNSUPPORTED_SUBQUERY, + "Pass-through query not supported. Please turn it on explicitly using elasticsearch.legacy-pass-through-query.enabled feature toggle"); + } table = table.substring(0, table.length() - PASSTHROUGH_QUERY_SUFFIX.length()); byte[] decoded; try { diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java index d0bd372681db..c439c1ea270e 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java @@ -59,12 +59,14 @@ public abstract class BaseElasticsearchConnectorTest extends BaseConnectorTest { private final String image; + private final String catalogName; private ElasticsearchServer elasticsearch; protected RestHighLevelClient client; - BaseElasticsearchConnectorTest(String image) + BaseElasticsearchConnectorTest(String image, String catalogName) { this.image = image; + this.catalogName = catalogName; } @Override @@ -76,7 +78,13 @@ protected QueryRunner createQueryRunner() HostAndPort address = elasticsearch.getAddress(); client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort()))); - return createElasticsearchQueryRunner(elasticsearch.getAddress(), TpchTable.getTables(), ImmutableMap.of(), ImmutableMap.of(), 3); + return createElasticsearchQueryRunner( + elasticsearch.getAddress(), + TpchTable.getTables(), + ImmutableMap.of(), + ImmutableMap.of("elasticsearch.legacy-pass-through-query.enabled", "true"), + 3, + catalogName); } @Override @@ -147,8 +155,8 @@ public void testWithoutBackpressure() { assertQuerySucceeds("SELECT * FROM orders"); // Check that JMX stats show no sign of backpressure - assertQueryReturnsEmptyResult("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*\" WHERE \"backpressurestats.alltime.count\" > 0"); - assertQueryReturnsEmptyResult("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*\" WHERE \"backpressurestats.alltime.max\" > 0"); + assertQueryReturnsEmptyResult(format("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*name=%s*\" WHERE \"backpressurestats.alltime.count\" > 0", catalogName)); + assertQueryReturnsEmptyResult(format("SELECT 1 FROM jmx.current.\"trino.plugin.elasticsearch.client:*name=%s*\" WHERE \"backpressurestats.alltime.max\" > 0", catalogName)); } @Test @@ -209,7 +217,7 @@ public void testSortItemsReflectedInExplain() public void testShowCreateTable() { assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) - .isEqualTo("CREATE TABLE elasticsearch.tpch.orders (\n" + + .isEqualTo(format("CREATE TABLE %s.tpch.orders (\n", catalogName) + " clerk varchar,\n" + " comment varchar,\n" + " custkey bigint,\n" + @@ -1813,7 +1821,7 @@ public void testPassthroughQuery() "FROM data", BaseEncoding.base32().encode(query.getBytes(UTF_8))))) .matches(format("WITH data(r) AS (" + " SELECT CAST(json_parse(result) AS ROW(aggregations ROW(max_orderkey ROW(value BIGINT), sum_orderkey ROW(value BIGINT)))) " + - " FROM TABLE(elasticsearch.system.raw_query(" + + format(" FROM TABLE(%s.system.raw_query(", catalogName) + " schema => 'tpch', " + " index => 'orders', " + " query => '%s'))) " + @@ -1882,7 +1890,7 @@ public void testQueryTableFunction() { // select single record assertQuery("SELECT json_query(result, 'lax $[0][0].hits.hits._source') " + - "FROM TABLE(elasticsearch.system.raw_query(" + + format("FROM TABLE(%s.system.raw_query(", catalogName) + "schema => 'tpch', " + "index => 'nation', " + "query => '{\"query\": {\"match\": {\"name\": \"ALGERIA\"}}}')) t(result)", @@ -1892,7 +1900,7 @@ public void testQueryTableFunction() Session session = Session.builder(getSession()) .addPreparedStatement( "my_query", - "SELECT json_query(result, 'lax $[0][0].hits.hits._source') FROM TABLE(elasticsearch.system.raw_query(schema => ?, index => ?, query => ?))") + format("SELECT json_query(result, 'lax $[0][0].hits.hits._source') FROM TABLE(%s.system.raw_query(schema => ?, index => ?, query => ?))", catalogName)) .build(); assertQuery( session, @@ -1901,7 +1909,7 @@ public void testQueryTableFunction() // select multiple records by range. Use array wrapper to wrap multiple results assertQuery("SELECT array_sort(CAST(json_parse(json_query(result, 'lax $[0][0].hits.hits._source.name' WITH ARRAY WRAPPER)) AS array(varchar))) " + - "FROM TABLE(elasticsearch.system.raw_query(" + + format("FROM TABLE(%s.system.raw_query(", catalogName) + "schema => 'tpch', " + "index => 'nation', " + "query => '{\"query\": {\"range\": {\"nationkey\": {\"gte\": 0,\"lte\": 3}}}}')) t(result)", @@ -1909,7 +1917,7 @@ public void testQueryTableFunction() // no matches assertQuery("SELECT json_query(result, 'lax $[0][0].hits.hits') " + - "FROM TABLE(elasticsearch.system.raw_query(" + + format("FROM TABLE(%s.system.raw_query(", catalogName) + "schema => 'tpch', " + "index => 'nation', " + "query => '{\"query\": {\"match\": {\"name\": \"UTOPIA\"}}}')) t(result)", @@ -1917,7 +1925,7 @@ public void testQueryTableFunction() // syntax error assertThatThrownBy(() -> query("SELECT * " + - "FROM TABLE(elasticsearch.system.raw_query(" + + format("FROM TABLE(%s.system.raw_query(", catalogName) + "schema => 'tpch', " + "index => 'nation', " + "query => 'wrong syntax')) t(result)")) @@ -1928,7 +1936,7 @@ protected void assertTableDoesNotExist(String name) { assertQueryReturnsEmptyResult(format("SELECT * FROM information_schema.columns WHERE table_name = '%s'", name)); assertFalse(computeActual("SHOW TABLES").getOnlyColumnAsSet().contains(name)); - assertQueryFails("SELECT * FROM " + name, ".*Table 'elasticsearch.tpch." + name + "' does not exist"); + assertQueryFails("SELECT * FROM " + name, ".*Table '" + catalogName + ".tpch." + name + "' does not exist"); } protected abstract String indexEndpoint(String index, String docId); diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestDisabledLegacyPassThroughQuery.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestDisabledLegacyPassThroughQuery.java new file mode 100644 index 000000000000..2a790f653ed9 --- /dev/null +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestDisabledLegacyPassThroughQuery.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.elasticsearch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.BaseEncoding; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static io.trino.plugin.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner; +import static io.trino.plugin.elasticsearch.ElasticsearchServer.ELASTICSEARCH_7_IMAGE; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TestDisabledLegacyPassThroughQuery + extends AbstractTestQueryFramework +{ + private ElasticsearchServer elasticsearch; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + elasticsearch = new ElasticsearchServer(ELASTICSEARCH_7_IMAGE, ImmutableMap.of()); + + return createElasticsearchQueryRunner( + elasticsearch.getAddress(), + TpchTable.getTables(), + ImmutableMap.of(), + ImmutableMap.of(), + 1, + "elasticsearch-legacy-pass-through-query"); + } + + @AfterClass(alwaysRun = true) + public final void destroy() + throws IOException + { + elasticsearch.stop(); + } + + @Test + public void testDisabledPassthroughQuery() + { + @Language("JSON") + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\" : {\n" + + " \"max_orderkey\" : { \"max\" : { \"field\" : \"orderkey\" } },\n" + + " \"sum_orderkey\" : { \"sum\" : { \"field\" : \"orderkey\" } }\n" + + " }\n" + + "}"; + + assertQueryFails( + format("WITH data(r) AS (" + + " SELECT CAST(json_parse(result) AS ROW(aggregations ROW(max_orderkey ROW(value BIGINT), sum_orderkey ROW(value BIGINT)))) " + + " FROM \"orders$query:%s\") " + + "SELECT r.aggregations.max_orderkey.value, r.aggregations.sum_orderkey.value " + + "FROM data", BaseEncoding.base32().encode(query.getBytes(UTF_8))), + "Pass-through query not supported\\. Please turn it on explicitly using elasticsearch\\.legacy-pass-through-query\\.enabled feature toggle"); + } +} diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch6ConnectorTest.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch6ConnectorTest.java index ac3399376977..c5a293bd8eb6 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch6ConnectorTest.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch6ConnectorTest.java @@ -28,7 +28,7 @@ public class TestElasticsearch6ConnectorTest { public TestElasticsearch6ConnectorTest() { - super("docker.elastic.co/elasticsearch/elasticsearch-oss:6.6.0"); + super("docker.elastic.co/elasticsearch/elasticsearch-oss:6.6.0", "elasticsearch6"); } @Test diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch7ConnectorTest.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch7ConnectorTest.java index 1b3aff346835..b48d76c34cc0 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch7ConnectorTest.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearch7ConnectorTest.java @@ -21,7 +21,7 @@ public class TestElasticsearch7ConnectorTest { public TestElasticsearch7ConnectorTest() { - super(ELASTICSEARCH_7_IMAGE); + super(ELASTICSEARCH_7_IMAGE, "elasticsearch7"); } @Override diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java index 9b3b8081b24c..e83ac4536165 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java @@ -57,7 +57,8 @@ public void testDefaults() .setTruststorePassword(null) .setVerifyHostnames(true) .setIgnorePublishAddress(false) - .setSecurity(null)); + .setSecurity(null) + .setLegacyPassThroughQueryEnabled(false)); } @Test @@ -89,6 +90,7 @@ public void testExplicitPropertyMappings() .put("elasticsearch.tls.verify-hostnames", "false") .put("elasticsearch.ignore-publish-address", "true") .put("elasticsearch.security", "AWS") + .put("elasticsearch.legacy-pass-through-query.enabled", "true") .buildOrThrow(); ElasticsearchConfig expected = new ElasticsearchConfig() @@ -112,7 +114,8 @@ public void testExplicitPropertyMappings() .setTruststorePassword("truststore-password") .setVerifyHostnames(false) .setIgnorePublishAddress(true) - .setSecurity(AWS); + .setSecurity(AWS) + .setLegacyPassThroughQueryEnabled(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchOpenSearchConnectorTest.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchOpenSearchConnectorTest.java index cc8a2738963d..ee0dd999729d 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchOpenSearchConnectorTest.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchOpenSearchConnectorTest.java @@ -21,7 +21,7 @@ public class TestElasticsearchOpenSearchConnectorTest public TestElasticsearchOpenSearchConnectorTest() { // 1.0.0 and 1.0.1 causes NotSslRecordException during the initialization - super("opensearchproject/opensearch:1.1.0"); + super("opensearchproject/opensearch:1.1.0", "opensearch"); } @Override