From 269187b139a309363eba956bfcc796d4ad4392bf Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Tue, 11 Feb 2025 17:49:35 +0800 Subject: [PATCH 1/3] Make basic aggregation working (partial) Signed-off-by: Lantao Jin --- .../sql/calcite/CalcitePlanContext.java | 12 -- .../sql/calcite/plan/OpenSearchQueryable.java | 10 +- .../sql/calcite/plan/OpenSearchTable.java | 2 - .../calcite/utils/OpenSearchRelDataTypes.java | 2 + .../sql/executor/OpenSearchTypeSystem.java | 41 +++++ .../opensearch/sql/executor/QueryService.java | 81 +++++---- integ-test/build.gradle | 7 +- .../sql/calcite/CalcitePPLAggregationIT.java | 159 ++++++++++++++++++ .../sql/calcite/CalcitePPLBasicIT.java | 142 ++++++++++++++++ .../CalcitePPLIntegTestCase.java} | 149 ++++++---------- .../executor/OpenSearchExecutionEngine.java | 56 +++--- .../opensearch/storage/OpenSearchIndex.java | 2 + .../scan/OpenSearchIndexEnumerator.java | 33 +++- .../sql/opensearch/util/JdbcUtil.java | 85 ++++++++++ .../sql/opensearch/util/RestRequestUtil.java | 11 +- .../plugin-metadata/plugin-security.policy | 4 + 16 files changed, 614 insertions(+), 182 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java rename integ-test/src/test/java/org/opensearch/sql/{ppl/CalciteStandaloneIT.java => calcite/CalcitePPLIntegTestCase.java} (80%) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 7355d2f4bf5..9efbded7bcd 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -8,25 +8,13 @@ import java.util.function.BiFunction; import lombok.Getter; import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.plan.Context; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; -import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.ast.expression.UnresolvedExpression; public class CalcitePlanContext { - public static class OSRelBuilder extends RelBuilder { - - protected OSRelBuilder( - @Nullable Context context, RelOptCluster cluster, @Nullable RelOptSchema relOptSchema) { - super(context, cluster, relOptSchema); - } - } - public FrameworkConfig config; public CalciteConnection connection; public final RelBuilder relBuilder; diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchQueryable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchQueryable.java index ff55d0da0e6..bf552590410 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchQueryable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchQueryable.java @@ -5,11 +5,13 @@ package org.opensearch.sql.calcite.plan; +import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.impl.AbstractTableQueryable; +/** not in use now */ public class OpenSearchQueryable extends AbstractTableQueryable { OpenSearchQueryable( @@ -19,6 +21,12 @@ public class OpenSearchQueryable extends AbstractTableQueryable { @Override public Enumerator enumerator() { - throw new UnsupportedOperationException("enumerator"); + //noinspection unchecked + final Enumerable enumerable = (Enumerable) getTable().search(); + return enumerable.enumerator(); + } + + private OpenSearchTable getTable() { + return (OpenSearchTable) table; } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java index 592fb105f2f..64f7ebc8e54 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java @@ -36,8 +36,6 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { final RelOptCluster cluster = context.getCluster(); - // return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), - // relOptTable); return new OpenSearchTableScan(cluster, relOptTable, this); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelDataTypes.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelDataTypes.java index 16c6cf04587..04c7b29d451 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelDataTypes.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelDataTypes.java @@ -88,6 +88,8 @@ public static RelDataType convertSchemaField(ExprType fieldType, boolean nullabl } else { if (fieldType.legacyTypeName().equalsIgnoreCase("binary")) { return TYPE_FACTORY.createSqlType(SqlTypeName.BINARY, nullable); + } else if (fieldType.legacyTypeName().equalsIgnoreCase("timestamp")) { + return TYPE_FACTORY.createSqlType(SqlTypeName.TIMESTAMP, nullable); } else if (fieldType.legacyTypeName().equalsIgnoreCase("geo_point")) { return TYPE_FACTORY.createSqlType(SqlTypeName.GEOMETRY, nullable); } else if (fieldType.legacyTypeName().equalsIgnoreCase("text")) { diff --git a/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java b/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java new file mode 100644 index 00000000000..57404d3260d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.type.SqlTypeName; + +public class OpenSearchTypeSystem extends RelDataTypeSystemImpl { + public static final RelDataTypeSystem INSTANCE = new OpenSearchTypeSystem(); + + private OpenSearchTypeSystem() {} + + @Override + public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argumentType) { + switch (argumentType.getSqlTypeName()) { + case INTEGER: + case BIGINT: + return typeFactory.createSqlType(SqlTypeName.DOUBLE); + + default: + return super.deriveSumType(typeFactory, argumentType); + } + } + + public RelDataType deriveAvgAggType(RelDataTypeFactory typeFactory, RelDataType argumentType) { + switch (argumentType.getSqlTypeName()) { + case INTEGER: + case BIGINT: + return typeFactory.createSqlType(SqlTypeName.DOUBLE); + + default: + return super.deriveSumType(typeFactory, argumentType); + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index baffc2e4db3..eb6634d87dd 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -8,6 +8,8 @@ package org.opensearch.sql.executor; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.List; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; @@ -17,6 +19,7 @@ import org.apache.calcite.jdbc.Driver; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.tools.FrameworkConfig; @@ -62,32 +65,49 @@ public class QueryService { */ public void execute( UnresolvedPlan plan, ResponseListener listener) { - try { - try { - // Use simple calcite schema since we don't compute tables in advance of the query. - CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false); - CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); - CalciteConnection connection = - factory.newConnection( - new Driver(), factory, "", new java.util.Properties(), rootSchema, null); - final SchemaPlus defaultSchema = - connection - .getRootSchema() - .add( - OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, - new OpenSearchSchema(dataSourceService)); - // Set opensearch schema as the default schema in config, otherwise we need to explicitly - // add schema path 'OpenSearch' before the opensearch table name - final FrameworkConfig config = buildFrameworkConfig(defaultSchema); - final CalcitePlanContext context = new CalcitePlanContext(config, connection); - executePlanByCalcite(analyze(plan, context), context, listener); - } catch (Exception e) { - LOG.warn("Fallback to V2 query engine since got exception", e); - executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); - } - } catch (Exception e) { - listener.onFailure(e); - } + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + if (relNodeVisitor == null) { + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + } else { + try { + // Use simple calcite schema since we don't compute tables in advance of the + // query. + CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false); + CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); + CalciteConnection connection = + factory.newConnection( + new Driver(), + factory, + "", + new java.util.Properties(), + rootSchema, + null); + final SchemaPlus defaultSchema = + connection + .getRootSchema() + .add( + OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, + new OpenSearchSchema(dataSourceService)); + // Set opensearch schema as the default schema in config, otherwise we need to + // explicitly + // add schema path 'OpenSearch' before the opensearch table name + final FrameworkConfig config = buildFrameworkConfig(defaultSchema); + final CalcitePlanContext context = new CalcitePlanContext(config, connection); + executePlanByCalcite(analyze(plan, context), context, listener); + } catch (Exception e) { + LOG.warn("Fallback to V2 query engine since got exception", e); + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + } + } + return null; + } catch (Exception e) { + listener.onFailure(e); + return null; + } + }); } /** @@ -120,11 +140,7 @@ public void executePlanByCalcite( RelNode plan, CalcitePlanContext context, ResponseListener listener) { - try { - executionEngine.execute(optimize(plan), context, listener); - } catch (Exception e) { - listener.onFailure(e); - } + executionEngine.execute(optimize(plan), context, listener); } /** @@ -157,7 +173,8 @@ private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) { .parserConfig(SqlParser.Config.DEFAULT) // TODO check .defaultSchema(defaultSchema) .traitDefs((List) null) - .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)) + .programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE)) + .typeSystem(OpenSearchTypeSystem.INSTANCE) .build(); } diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 8d8f7b89a73..87d9f66d2a5 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -48,6 +48,7 @@ String baseVersion = "2.17.0" String bwcVersion = baseVersion + ".0"; String baseName = "sqlBwcCluster" String bwcFilePath = "src/test/resources/bwc/" +String calciteCodegen = "$projectDir/src/test/java/codegen/" repositories { mavenCentral() @@ -427,7 +428,11 @@ integTest { finalizedBy stopPrometheus } - systemProperty 'java.security.manager', 'disallow' + // enable calcite codegen in IT + systemProperty 'calcite.debug', 'false' + systemProperty 'org.codehaus.janino.source_debugging.enable', 'false' + systemProperty 'org.codehaus.janino.source_debugging.dir', calciteCodegen + systemProperty 'tests.security.manager', 'false' systemProperty('project.root', project.projectDir.absolutePath) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java new file mode 100644 index 00000000000..274230ce28c --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; + +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; + +public class CalcitePPLAggregationIT extends CalcitePPLIntegTestCase { + + @Override + public void init() throws IOException { + super.init(); + + loadIndex(Index.BANK); + } + + @Test + public void testSimpleCount0() throws IOException { + Request request1 = new Request("PUT", "/test/_doc/1?refresh=true"); + request1.setJsonEntity("{\"name\": \"hello\", \"age\": 20}"); + client().performRequest(request1); + Request request2 = new Request("PUT", "/test/_doc/2?refresh=true"); + request2.setJsonEntity("{\"name\": \"world\", \"age\": 30}"); + client().performRequest(request2); + + String actual = execute("source=test | stats count() as c"); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"c\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 2\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @Test + public void testSimpleCount() { + String actual = execute(String.format("source=%s | stats count() as c", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"c\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 7\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @Test + public void testSimpleAvg() { + String actual = execute(String.format("source=%s | stats avg(balance)", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"avg(balance)\",\n" + + " \"type\": \"double\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 26710.428571428572\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @Test + public void testSumAvg() { + String actual = execute(String.format("source=%s | stats sum(balance)", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"sum(balance)\",\n" + + " \"type\": \"double\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 186973.0\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @org.junit.Test + public void testMultipleAggregatesWithAliases() { + String actual = + execute( + String.format( + "source=%s | stats avg(balance) as avg, max(balance) as max, min(balance) as min," + + " count()", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"avg\",\n" + + " \"type\": \"double\"\n" + + " },\n" + + " {\n" + + " \"name\": \"max\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + + " \"name\": \"min\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + + " \"name\": \"count()\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 26710.428571428572,\n" + + " 48086,\n" + + " 4180,\n" + + " 7\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java new file mode 100644 index 00000000000..2b4f58db207 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java @@ -0,0 +1,142 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite; + +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; + +public class CalcitePPLBasicIT extends CalcitePPLIntegTestCase { + + @Override + public void init() throws IOException { + super.init(); + Request request1 = new Request("PUT", "/test/_doc/1?refresh=true"); + request1.setJsonEntity("{\"name\": \"hello\", \"age\": 20}"); + client().performRequest(request1); + Request request2 = new Request("PUT", "/test/_doc/2?refresh=true"); + request2.setJsonEntity("{\"name\": \"world\", \"age\": 30}"); + client().performRequest(request2); + } + + @Test + public void testInvalidTable() { + assertThrows( + "OpenSearch exception [type=index_not_found_exception, reason=no such index [unknown]]", + IllegalStateException.class, + () -> execute("source=unknown")); + } + + @Test + public void testSourceFieldQuery() { + String actual = execute("source=test | fields name"); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"hello\"\n" + + " ],\n" + + " [\n" + + " \"world\"\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } + + @Test + public void testFilterQuery1() { + String actual = execute("source=test | where age = 30 | fields name, age"); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"world\",\n" + + " 30\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @Test + public void testFilterQuery2() { + String actual = execute("source=test | where age = 20 | fields name, age"); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"hello\",\n" + + " 20\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @Test + public void testFilterQuery3() { + String actual = execute("source=test | where age > 10 | fields name, age"); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"hello\",\n" + + " 20\n" + + " ],\n" + + " [\n" + + " \"world\",\n" + + " 30\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/CalciteStandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java similarity index 80% rename from integ-test/src/test/java/org/opensearch/sql/ppl/CalciteStandaloneIT.java rename to integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java index 87904239a97..cbb3a86e421 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/CalciteStandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.ppl; +package org.opensearch.sql.calcite; import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; @@ -17,8 +17,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import lombok.RequiredArgsConstructor; -import org.junit.jupiter.api.Test; -import org.opensearch.client.Request; import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.inject.AbstractModule; @@ -28,7 +26,6 @@ import org.opensearch.common.inject.Singleton; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.DataSourceService; @@ -37,7 +34,6 @@ import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.executor.QueryManager; import org.opensearch.sql.executor.QueryService; import org.opensearch.sql.executor.execution.QueryPlanFactory; @@ -55,6 +51,8 @@ import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; +import org.opensearch.sql.ppl.PPLIntegTestCase; +import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; @@ -65,18 +63,13 @@ import org.opensearch.sql.storage.StorageEngine; import org.opensearch.sql.util.ExecuteOnCallerThreadQueryManager; -/** - * Run PPL with query engine outside OpenSearch cluster with Calcite implementation. This IT doesn't - * require our plugin installed actually. The client application, ex. JDBC driver, needs to - * initialize all components itself required by ppl service. - */ -public class CalciteStandaloneIT extends PPLIntegTestCase { - - private PPLService pplService; +public abstract class CalcitePPLIntegTestCase extends PPLIntegTestCase { + protected PPLService pplService; @Override - public void init() { - RestHighLevelClient restClient = new InternalRestHighLevelClient(client()); + public void init() throws IOException { + RestHighLevelClient restClient = + new CalcitePPLIntegTestCase.InternalRestHighLevelClient(client()); OpenSearchClient client = new OpenSearchRestClient(restClient); DataSourceService dataSourceService = new DataSourceServiceImpl( @@ -89,52 +82,43 @@ public void init() { ModulesBuilder modules = new ModulesBuilder(); modules.add( - new StandaloneModule( - new InternalRestHighLevelClient(client()), defaultSettings(), dataSourceService)); + new CalcitePPLIntegTestCase.StandaloneModule( + new CalcitePPLIntegTestCase.InternalRestHighLevelClient(client()), + defaultSettings(), + dataSourceService)); Injector injector = modules.createInjector(); pplService = SecurityAccess.doPrivileged(() -> injector.getInstance(PPLService.class)); } - @Test - public void testSourceFieldQuery() throws IOException { - Request request1 = new Request("PUT", "/test/_doc/1?refresh=true"); - request1.setJsonEntity("{\"name\": \"hello\", \"age\": 20}"); - client().performRequest(request1); - Request request2 = new Request("PUT", "/test/_doc/2?refresh=true"); - request2.setJsonEntity("{\"name\": \"world\", \"age\": 30}"); - client().performRequest(request2); - - String actual = executeByStandaloneQueryEngine("source=test | fields name"); - assertEquals( - "{\n" - + " \"schema\": [\n" - + " {\n" - + " \"name\": \"name\",\n" - + " \"type\": \"string\"\n" - + " }\n" - + " ],\n" - + " \"datarows\": [\n" - + " [\n" - + " \"hello\"\n" - + " ],\n" - + " [\n" - + " \"world\"\n" - + " ]\n" - + " ],\n" - + " \"total\": 2,\n" - + " \"size\": 2\n" - + "}", - actual); + private Settings defaultSettings() { + return new Settings() { + private final Map defaultSettings = + new ImmutableMap.Builder() + .put(Key.QUERY_SIZE_LIMIT, 200) + .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) + .put(Key.FIELD_TYPE_TOLERANCE, true) + .build(); + + @Override + public T getSettingValue(Key key) { + return (T) defaultSettings.get(key); + } + + @Override + public List getSettings() { + return (List) defaultSettings; + } + }; } - private String executeByStandaloneQueryEngine(String query) { + protected String execute(String query) { AtomicReference actual = new AtomicReference<>(); pplService.execute( new PPLQueryRequest(query, null, null), - new ResponseListener() { + new ResponseListener() { @Override - public void onResponse(QueryResponse response) { + public void onResponse(ExecutionEngine.QueryResponse response) { QueryResult result = new QueryResult(response.getSchema(), response.getResults()); String json = new SimpleJsonResponseFormatter(PRETTY).format(result); actual.set(json); @@ -148,24 +132,33 @@ public void onFailure(Exception e) { return actual.get(); } - private Settings defaultSettings() { - return new Settings() { - private final Map defaultSettings = - new ImmutableMap.Builder() - .put(Key.QUERY_SIZE_LIMIT, 200) - .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) - .put(Key.FIELD_TYPE_TOLERANCE, true) - .build(); - + public static DataSourceMetadataStorage getDataSourceMetadataStorage() { + return new DataSourceMetadataStorage() { @Override - public T getSettingValue(Key key) { - return (T) defaultSettings.get(key); + public List getDataSourceMetadata() { + return Collections.emptyList(); } @Override - public List getSettings() { - return (List) defaultSettings; + public Optional getDataSourceMetadata(String datasourceName) { + return Optional.empty(); } + + @Override + public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {} + + @Override + public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {} + + @Override + public void deleteDataSourceMetadata(String datasourceName) {} + }; + } + + public static DataSourceUserAuthorizationHelper getDataSourceUserRoleHelper() { + return new DataSourceUserAuthorizationHelper() { + @Override + public void authorizeDataSource(DataSourceMetadata dataSourceMetadata) {} }; } @@ -250,34 +243,4 @@ public QueryPlanFactory queryPlanFactory(ExecutionEngine executionEngine) { return new QueryPlanFactory(queryService); } } - - public static DataSourceMetadataStorage getDataSourceMetadataStorage() { - return new DataSourceMetadataStorage() { - @Override - public List getDataSourceMetadata() { - return Collections.emptyList(); - } - - @Override - public Optional getDataSourceMetadata(String datasourceName) { - return Optional.empty(); - } - - @Override - public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {} - - @Override - public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {} - - @Override - public void deleteDataSourceMetadata(String datasourceName) {} - }; - } - - public static DataSourceUserAuthorizationHelper getDataSourceUserRoleHelper() { - return new DataSourceUserAuthorizationHelper() { - @Override - public void authorizeDataSource(DataSourceMetadata dataSourceMetadata) {} - }; - } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 0fd65dd9325..9d5702ead9e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -5,6 +5,8 @@ package org.opensearch.sql.opensearch.executor; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -16,15 +18,12 @@ import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.tools.RelRunner; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; @@ -32,6 +31,7 @@ import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector; +import org.opensearch.sql.opensearch.util.JdbcUtil; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.TableScanOperator; @@ -108,19 +108,24 @@ public ExplainResponseNode visitTableScan( @Override public void execute( RelNode rel, CalcitePlanContext context, ResponseListener listener) { - Connection connection = context.connection; - try { - RelRunner runner = connection.unwrap(RelRunner.class); - PreparedStatement statement = runner.prepareStatement(rel); - ResultSet result = statement.executeQuery(); - printResultSet(result, listener); - } catch (SQLException e) { - throw new RuntimeException(e); - } + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + Connection connection = context.connection; + try { + RelRunner relRunner = connection.unwrap(RelRunner.class); + try (PreparedStatement statement = relRunner.prepareStatement(rel)) { + ResultSet resultSet = statement.executeQuery(); + buildResultSet(resultSet, listener); + } + return null; + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); } - // for testing only - private void printResultSet(ResultSet resultSet, ResponseListener listener) + private void buildResultSet(ResultSet resultSet, ResponseListener listener) throws SQLException { // Get the ResultSet metadata to know about columns ResultSetMetaData metaData = resultSet.getMetaData(); @@ -133,29 +138,22 @@ private void printResultSet(ResultSet resultSet, ResponseListener // Loop through each column for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); - String value = resultSet.getString(i); - System.out.println(columnName + ": " + value); - - row.put(columnName, new ExprStringValue(value)); + int sqlType = metaData.getColumnType(i); + ExprValue exprValue = JdbcUtil.getExprValueFromSqlType(resultSet, i, sqlType); + row.put(columnName, exprValue); } values.add(ExprTupleValue.fromExprValueMap(row)); - System.out.println("-------------------"); // Separator between rows } List columns = new ArrayList<>(metaData.getColumnCount()); for (int i = 1; i <= columnCount; ++i) { - // TODO: mapping RelDataType to ExprType or deprecate ExprType - columns.add(new Column(metaData.getColumnName(i), null, ExprCoreType.STRING)); + String columnName = metaData.getColumnName(i); + int sqlType = metaData.getColumnType(i); + ExprType exprType = JdbcUtil.getExprTypeFromSqlType(sqlType); + columns.add(new Column(columnName, null, exprType)); } Schema schema = new Schema(columns); QueryResponse response = new QueryResponse(schema, values, null); listener.onResponse(response); } - - private RelDataType makeStruct(RelDataTypeFactory typeFactory, RelDataType type) { - if (type.isStruct()) { - return type; - } - return typeFactory.builder().add("$0", type).build(); - } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 2ef95173a1f..f4085fac336 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -8,6 +8,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; import lombok.RequiredArgsConstructor; @@ -242,6 +243,7 @@ public Enumerator enumerator() { new OpenSearchRequestBuilder(querySizeLimit, createExprValueFactory(), settings); return new OpenSearchIndexEnumerator( client, + List.copyOf(getFieldTypes().keySet()), builder.getMaxResponseSize(), builder.build(indexName, getMaxResultWindow(), cursorKeepAlive, client)); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 573bb7965cf..407c945fee9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -7,6 +7,7 @@ import java.util.Collections; import java.util.Iterator; +import java.util.List; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.calcite.linq4j.Enumerator; @@ -20,6 +21,8 @@ public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; + private final List fields; + /** Search request. */ @EqualsAndHashCode.Include @ToString.Include private final OpenSearchRequest request; @@ -32,14 +35,19 @@ public class OpenSearchIndexEnumerator implements Enumerator { /** Search response for current batch. */ private Iterator iterator; + private ExprValue current; + public OpenSearchIndexEnumerator( - OpenSearchClient client, int maxResponseSize, OpenSearchRequest request) { + OpenSearchClient client, + List fields, + int maxResponseSize, + OpenSearchRequest request) { this.client = client; + this.fields = fields; this.maxResponseSize = maxResponseSize; this.request = request; this.queryCount = 0; - this.iterator = Collections.emptyIterator(); - fetchNextBatch(); + this.current = null; } private void fetchNextBatch() { @@ -51,27 +59,36 @@ private void fetchNextBatch() { @Override public Object current() { - queryCount++; - return iterator.next().tupleValue().values().stream().map(ExprValue::value).toArray(); + Object[] p = fields.stream().map(k -> current.tupleValue().get(k).value()).toArray(); + return p; } @Override public boolean moveNext() { if (queryCount >= maxResponseSize) { iterator = Collections.emptyIterator(); - } else if (!iterator.hasNext()) { + return false; + } else if (iterator == null || !iterator.hasNext()) { fetchNextBatch(); } - return iterator.hasNext(); + if (iterator.hasNext()) { + current = iterator.next(); + queryCount++; + return true; + } else { + return false; + } } @Override public void reset() { - throw new UnsupportedOperationException(); + iterator = Collections.emptyIterator(); + queryCount = 0; } @Override public void close() { + reset(); client.cleanup(request); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java new file mode 100644 index 00000000000..d758614d1a7 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.util; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import lombok.experimental.UtilityClass; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; + +@UtilityClass +public class JdbcUtil { + + public static ExprType getExprTypeFromSqlType(int sqlType) { + switch (sqlType) { + case Types.INTEGER: + return ExprCoreType.INTEGER; + case Types.BIGINT: + return ExprCoreType.LONG; + case Types.DOUBLE: + case Types.DECIMAL: + case Types.NUMERIC: + return ExprCoreType.DOUBLE; + case Types.FLOAT: + return ExprCoreType.FLOAT; + case Types.DATE: + return ExprCoreType.DATE; + case Types.TIMESTAMP: + return ExprCoreType.TIMESTAMP; + case Types.BOOLEAN: + return ExprCoreType.BOOLEAN; + case Types.VARCHAR: + case Types.CHAR: + case Types.LONGVARCHAR: + default: + return ExprCoreType.STRING; + } + } + + public static ExprValue getExprValueFromSqlType(ResultSet rs, int i, int sqlType) + throws SQLException { + Object value; + switch (sqlType) { + case Types.VARCHAR: + case Types.CHAR: + case Types.LONGVARCHAR: + value = rs.getString(i); + break; + case Types.INTEGER: + value = rs.getInt(i); + break; + case Types.BIGINT: + value = rs.getLong(i); + break; + case Types.DECIMAL: + case Types.NUMERIC: + value = rs.getBigDecimal(i); + break; + case Types.DOUBLE: + value = rs.getDouble(i); + break; + case Types.FLOAT: + value = rs.getFloat(i); + break; + case Types.DATE: + value = rs.getDate(i); + break; + case Types.TIMESTAMP: + value = rs.getTimestamp(i); + break; + case Types.BOOLEAN: + value = rs.getBoolean(i); + break; + default: + value = rs.getObject(i); + } + return ExprValueUtils.fromObjectValue(value); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java index e02bcf5af9d..5ce982da58b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java @@ -1,17 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql.opensearch.util; import lombok.NonNull; +import lombok.experimental.UtilityClass; import org.opensearch.client.node.NodeClient; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; /** RestRequestUtil is a utility class for common operations on OpenSearch RestRequest's. */ +@UtilityClass public class RestRequestUtil { - private RestRequestUtil() { - // utility class - } - /** * Utility method for consuming all the request parameters. Doing this will ensure that the * BaseRestHandler doesn't fail the request with an unconsumed parameter exception. diff --git a/plugin/src/main/plugin-metadata/plugin-security.policy b/plugin/src/main/plugin-metadata/plugin-security.policy index fcf70c01f93..4a5bf3ff242 100644 --- a/plugin/src/main/plugin-metadata/plugin-security.policy +++ b/plugin/src/main/plugin-metadata/plugin-security.policy @@ -24,4 +24,8 @@ grant { permission javax.management.MBeanServerPermission "findMBeanServer"; permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*"; permission javax.management.MBeanTrustPermission "register"; + + // Calcite + permission java.util.PropertyPermission "*", "read,write"; + permission java.lang.RuntimePermission "*"; }; From df413dbdf83a5ea12df8c1c3848b24cae0be142c Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Wed, 12 Feb 2025 16:34:30 +0800 Subject: [PATCH 2/3] add a settings to enable calcite Signed-off-by: Lantao Jin --- .../sql/common/setting/Settings.java | 3 ++ .../sql/executor/OpenSearchTypeSystem.java | 12 ----- .../opensearch/sql/executor/QueryService.java | 44 +++++++++++-------- .../sql/calcite/CalcitePPLAggregationIT.java | 4 +- .../sql/calcite/CalcitePPLIntegTestCase.java | 4 +- .../setting/OpenSearchSettings.java | 14 ++++++ .../plugin/config/OpenSearchPluginModule.java | 5 ++- 7 files changed, 50 insertions(+), 36 deletions(-) diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index a9fa693a228..a39989ed3cf 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -28,6 +28,9 @@ public enum Key { /** PPL Settings. */ PPL_ENABLED("plugins.ppl.enabled"), + /** Enable Calcite as execution engine */ + CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"), + /** Query Settings. */ FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"), diff --git a/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java b/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java index 57404d3260d..aba30bf2fc0 100644 --- a/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java +++ b/core/src/main/java/org/opensearch/sql/executor/OpenSearchTypeSystem.java @@ -16,18 +16,6 @@ public class OpenSearchTypeSystem extends RelDataTypeSystemImpl { private OpenSearchTypeSystem() {} - @Override - public RelDataType deriveSumType(RelDataTypeFactory typeFactory, RelDataType argumentType) { - switch (argumentType.getSqlTypeName()) { - case INTEGER: - case BIGINT: - return typeFactory.createSqlType(SqlTypeName.DOUBLE); - - default: - return super.deriveSumType(typeFactory, argumentType); - } - } - public RelDataType deriveAvgAggType(RelDataTypeFactory typeFactory, RelDataType argumentType) { switch (argumentType.getSqlTypeName()) { case INTEGER: diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index eb6634d87dd..b08ae2e567e 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -34,6 +34,7 @@ import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.calcite.OpenSearchSchema; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; @@ -56,6 +57,8 @@ public class QueryService { private DataSourceService dataSourceService; + private Settings settings; + /** * Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
* Todo. deprecated this interface after finalize {@link PlanContext}. @@ -65,14 +68,18 @@ public class QueryService { */ public void execute( UnresolvedPlan plan, ResponseListener listener) { - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - try { - if (relNodeVisitor == null) { - executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); - } else { - try { + try { + boolean calciteEnabled = false; + if (settings != null) { + calciteEnabled = settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED); + } + if (!calciteEnabled || relNodeVisitor == null) { + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + } else { + try { + AccessController.doPrivileged( + (PrivilegedAction) + () -> { // Use simple calcite schema since we don't compute tables in advance of the // query. CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false); @@ -97,17 +104,16 @@ public void execute( final FrameworkConfig config = buildFrameworkConfig(defaultSchema); final CalcitePlanContext context = new CalcitePlanContext(config, connection); executePlanByCalcite(analyze(plan, context), context, listener); - } catch (Exception e) { - LOG.warn("Fallback to V2 query engine since got exception", e); - executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); - } - } - return null; - } catch (Exception e) { - listener.onFailure(e); - return null; - } - }); + return null; + }); + } catch (Exception e) { + LOG.warn("Fallback to V2 query engine since got exception", e); + executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener); + } + } + } catch (Exception e) { + listener.onFailure(e); + } } /** diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java index 274230ce28c..f2a16ff7009 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java @@ -101,12 +101,12 @@ public void testSumAvg() { + " \"schema\": [\n" + " {\n" + " \"name\": \"sum(balance)\",\n" - + " \"type\": \"double\"\n" + + " \"type\": \"long\"\n" + " }\n" + " ],\n" + " \"datarows\": [\n" + " [\n" - + " 186973.0\n" + + " 186973\n" + " ]\n" + " ],\n" + " \"total\": 1,\n" diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java index cbb3a86e421..22d196cfc90 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLIntegTestCase.java @@ -97,6 +97,7 @@ private Settings defaultSettings() { .put(Key.QUERY_SIZE_LIMIT, 200) .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) .put(Key.FIELD_TYPE_TOLERANCE, true) + .put(Key.CALCITE_ENGINE_ENABLED, true) .build(); @Override @@ -239,7 +240,8 @@ public QueryPlanFactory queryPlanFactory(ExecutionEngine executionEngine) { Planner planner = new Planner(LogicalPlanOptimizer.create()); CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor(); QueryService queryService = - new QueryService(analyzer, executionEngine, planner, relNodeVisitor, dataSourceService); + new QueryService( + analyzer, executionEngine, planner, relNodeVisitor, dataSourceService, settings); return new QueryPlanFactory(queryService); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 612771eea43..91c0c9e7352 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -85,6 +85,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting CALCITE_ENGINE_ENABLED_SETTING = + Setting.boolSetting( + Key.CALCITE_ENGINE_ENABLED.getKeyValue(), + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting QUERY_MEMORY_LIMIT_SETTING = new Setting<>( Key.QUERY_MEMORY_LIMIT.getKeyValue(), @@ -276,6 +283,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.PPL_ENABLED, PPL_ENABLED_SETTING, new Updater(Key.PPL_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.CALCITE_ENGINE_ENABLED, + CALCITE_ENGINE_ENABLED_SETTING, + new Updater(Key.CALCITE_ENGINE_ENABLED)); register( settingBuilder, clusterSettings, @@ -450,6 +463,7 @@ public static List> pluginSettings() { .add(SQL_DELETE_ENABLED_SETTING) .add(SQL_PAGINATION_API_SEARCH_AFTER_SETTING) .add(PPL_ENABLED_SETTING) + .add(CALCITE_ENGINE_ENABLED_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) .add(QUERY_SIZE_LIMIT_SETTING) .add(METRICS_ROLLING_WINDOW_SETTING) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java index 7f1c3e9c860..5d6c3f325db 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java @@ -98,14 +98,15 @@ public SQLService sqlService(QueryManager queryManager, QueryPlanFactory queryPl /** {@link QueryPlanFactory}. */ @Provides public QueryPlanFactory queryPlanFactory( - DataSourceService dataSourceService, ExecutionEngine executionEngine) { + DataSourceService dataSourceService, ExecutionEngine executionEngine, Settings settings) { Analyzer analyzer = new Analyzer( new ExpressionAnalyzer(functionRepository), dataSourceService, functionRepository); Planner planner = new Planner(LogicalPlanOptimizer.create()); CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor(); QueryService queryService = - new QueryService(analyzer, executionEngine, planner, relNodeVisitor, dataSourceService); + new QueryService( + analyzer, executionEngine, planner, relNodeVisitor, dataSourceService, settings); return new QueryPlanFactory(queryService); } } From d9acec6edb8e8c8dd34ee5937ad888b17c9c1442 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Wed, 12 Feb 2025 18:30:47 +0800 Subject: [PATCH 3/3] add more UTs Signed-off-by: Lantao Jin --- .../sql/calcite/CalcitePPLAggregationIT.java | 146 ++++- .../sql/calcite/CalcitePPLBasicIT.java | 579 +++++++++++++++++- .../sql/opensearch/util/JdbcUtil.java | 11 +- ppl/src/main/antlr/OpenSearchPPLParser.g4 | 5 +- .../sql/ppl/parser/AstExpressionBuilder.java | 6 + .../sql/ppl/calcite/CalcitePPLBasicTest.java | 4 +- 6 files changed, 744 insertions(+), 7 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java index f2a16ff7009..ac3d610ba20 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLAggregationIT.java @@ -115,7 +115,7 @@ public void testSumAvg() { actual); } - @org.junit.Test + @Test public void testMultipleAggregatesWithAliases() { String actual = execute( @@ -156,4 +156,148 @@ public void testMultipleAggregatesWithAliases() { + "}", actual); } + + @Test + public void testAvgByField() { + String actual = + execute(String.format("source=%s | stats avg(balance) by gender", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"avg(balance)\",\n" + + " \"type\": \"double\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"F\",\n" + + " 40488.0\n" + + " ],\n" + + " [\n" + + " \"M\",\n" + + " 16377.25\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } + + @org.junit.Test + public void testAvgBySpan() { + String actual = + execute(String.format("source=%s | stats avg(balance) by span(age, 10)", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"span(age,10)\",\n" + + " \"type\": \"double\"\n" + + " },\n" + + " {\n" + + " \"name\": \"avg(balance)\",\n" + + " \"type\": \"double\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 20.0,\n" + + " 32838.0\n" + + " ],\n" + + " [\n" + + " 30.0,\n" + + " 25689.166666666668\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } + + @Test + public void testAvgBySpanAndFields() { + String actual = + execute( + String.format( + "source=%s | stats avg(balance) by span(age, 10) as age_span, gender", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age_span\",\n" + + " \"type\": \"double\"\n" + + " },\n" + + " {\n" + + " \"name\": \"avg(balance)\",\n" + + " \"type\": \"double\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"F\",\n" + + " 30.0,\n" + + " 44313.0\n" + + " ],\n" + + " [\n" + + " \"M\",\n" + + " 30.0,\n" + + " 16377.25\n" + + " ],\n" + + " [\n" + + " \"F\",\n" + + " 20.0,\n" + + " 32838.0\n" + + " ]\n" + + " ],\n" + + " \"total\": 3,\n" + + " \"size\": 3\n" + + "}", + actual); + } + + // TODO fallback to V2 because missing conversion LogicalAggregate[convention: NONE -> ENUMERABLE] + @Test + public void testCountDistinct() { + String actual = + execute( + String.format("source=%s | stats distinct_count(state) by gender", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"distinct_count(state)\",\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 3,\n" + + " \"f\"\n" + + " ],\n" + + " [\n" + + " 4,\n" + + " \"m\"\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java index 2b4f58db207..6bb37b5b0d1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalcitePPLBasicIT.java @@ -5,7 +5,10 @@ package org.opensearch.sql.calcite; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; + import java.io.IOException; +import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; @@ -20,6 +23,8 @@ public void init() throws IOException { Request request2 = new Request("PUT", "/test/_doc/2?refresh=true"); request2.setJsonEntity("{\"name\": \"world\", \"age\": 30}"); client().performRequest(request2); + + loadIndex(Index.BANK); } @Test @@ -111,7 +116,7 @@ public void testFilterQuery2() { @Test public void testFilterQuery3() { - String actual = execute("source=test | where age > 10 | fields name, age"); + String actual = execute("source=test | where age > 10 AND age < 100 | fields name, age"); assertEquals( "{\n" + " \"schema\": [\n" @@ -139,4 +144,576 @@ public void testFilterQuery3() { + "}", actual); } + + @Test + public void testFilterQueryWithOr() { + String actual = + execute( + String.format( + "source=%s | where (account_number = 25 or balance > 10000) and gender = 'M' |" + + " fields firstname, lastname", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"firstname\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"lastname\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"Amber JOHnny\",\n" + + " \"Duke Willmington\"\n" + + " ],\n" + + " [\n" + + " \"Elinor\",\n" + + " \"Ratliff\"\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } + + @Test + public void testFilterQueryWithOr2() { + String actual = + execute( + String.format( + "source=%s (account_number = 25 or balance > 10000) and gender = 'M' |" + + " fields firstname, lastname", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"firstname\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"lastname\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"Amber JOHnny\",\n" + + " \"Duke Willmington\"\n" + + " ],\n" + + " [\n" + + " \"Elinor\",\n" + + " \"Ratliff\"\n" + + " ]\n" + + " ],\n" + + " \"total\": 2,\n" + + " \"size\": 2\n" + + "}", + actual); + } + + @Test + public void testQueryMinusFields() { + String actual = + execute(String.format("source=%s | fields - firstname, lastname", TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"account_number\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + + " \"name\": \"address\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"birthdate\",\n" + + " \"type\": \"timestamp\"\n" + + " },\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"city\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"balance\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + + " \"name\": \"employer\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"state\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age\",\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " {\n" + + " \"name\": \"email\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"male\",\n" + + " \"type\": \"boolean\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 1,\n" + + " \"880 Holmes Lane\",\n" + + " \"2017-10-23 00:00:00\",\n" + + " \"M\",\n" + + " \"Brogan\",\n" + + " 39225,\n" + + " \"Pyrami\",\n" + + " \"IL\",\n" + + " 32,\n" + + " \"amberduke@pyrami.com\",\n" + + " true\n" + + " ],\n" + + " [\n" + + " 6,\n" + + " \"671 Bristol Street\",\n" + + " \"2017-11-20 00:00:00\",\n" + + " \"M\",\n" + + " \"Dante\",\n" + + " 5686,\n" + + " \"Netagy\",\n" + + " \"TN\",\n" + + " 36,\n" + + " \"hattiebond@netagy.com\",\n" + + " true\n" + + " ],\n" + + " [\n" + + " 13,\n" + + " \"789 Madison Street\",\n" + + " \"2018-06-23 00:00:00\",\n" + + " \"F\",\n" + + " \"Nogal\",\n" + + " 32838,\n" + + " \"Quility\",\n" + + " \"VA\",\n" + + " 28,\n" + + " \"nanettebates@quility.com\",\n" + + " false\n" + + " ],\n" + + " [\n" + + " 18,\n" + + " \"467 Hutchinson Court\",\n" + + " \"2018-11-13 23:33:20\",\n" + + " \"M\",\n" + + " \"Orick\",\n" + + " 4180,\n" + + " \"Boink\",\n" + + " \"MD\",\n" + + " 33,\n" + + " \"daleadams@boink.com\",\n" + + " true\n" + + " ],\n" + + " [\n" + + " 20,\n" + + " \"282 Kings Place\",\n" + + " \"2018-06-27 00:00:00\",\n" + + " \"M\",\n" + + " \"Ribera\",\n" + + " 16418,\n" + + " \"Scentric\",\n" + + " \"WA\",\n" + + " 36,\n" + + " \"elinorratliff@scentric.com\",\n" + + " true\n" + + " ],\n" + + " [\n" + + " 25,\n" + + " \"171 Putnam Avenue\",\n" + + " \"2018-08-19 00:00:00\",\n" + + " \"F\",\n" + + " \"Nicholson\",\n" + + " 40540,\n" + + " \"Filodyne\",\n" + + " \"PA\",\n" + + " 39,\n" + + " \"virginiaayala@filodyne.com\",\n" + + " false\n" + + " ],\n" + + " [\n" + + " 32,\n" + + " \"702 Quentin Street\",\n" + + " \"2018-08-11 00:00:00\",\n" + + " \"F\",\n" + + " \"Veguita\",\n" + + " 48086,\n" + + " \"Quailcom\",\n" + + " \"IN\",\n" + + " 34,\n" + + " \"dillardmcpherson@quailcom.com\",\n" + + " false\n" + + " ]\n" + + " ],\n" + + " \"total\": 7,\n" + + " \"size\": 7\n" + + "}", + actual); + } + + // TODO bug: shouldn't return empty + @Ignore + public void testQueryMinusFieldsWithFilter() { + String actual = + execute( + String.format( + "source=%s | where (account_number = 25 or balance > 10000) and gender = 'M' |" + + " fields - firstname, lastname", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"account_number\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + + " \"name\": \"address\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"birthdate\",\n" + + " \"type\": \"timestamp\"\n" + + " },\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"city\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"balance\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + + " \"name\": \"employer\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"state\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"age\",\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " {\n" + + " \"name\": \"email\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"male\",\n" + + " \"type\": \"boolean\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [],\n" + + " \"total\": 0,\n" + + " \"size\": 0\n" + + "}", + actual); + } + + @Test + public void testFieldsPlusThenMinus() { + String actual = + execute( + String.format( + "source=%s | fields + firstname, lastname, account_number | fields - firstname," + + " lastname", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"account_number\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 1\n" + + " ],\n" + + " [\n" + + " 6\n" + + " ],\n" + + " [\n" + + " 13\n" + + " ],\n" + + " [\n" + + " 18\n" + + " ],\n" + + " [\n" + + " 20\n" + + " ],\n" + + " [\n" + + " 25\n" + + " ],\n" + + " [\n" + + " 32\n" + + " ]\n" + + " ],\n" + + " \"total\": 7,\n" + + " \"size\": 7\n" + + "}", + actual); + } + + @Test + public void testSort() { + String actual = + execute( + String.format( + "source=%s | fields + firstname, gender, account_number | sort - account_number", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"firstname\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"account_number\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"Dillard\",\n" + + " \"F\",\n" + + " 32\n" + + " ],\n" + + " [\n" + + " \"Virginia\",\n" + + " \"F\",\n" + + " 25\n" + + " ],\n" + + " [\n" + + " \"Elinor\",\n" + + " \"M\",\n" + + " 20\n" + + " ],\n" + + " [\n" + + " \"Dale\",\n" + + " \"M\",\n" + + " 18\n" + + " ],\n" + + " [\n" + + " \"Nanette\",\n" + + " \"F\",\n" + + " 13\n" + + " ],\n" + + " [\n" + + " \"Hattie\",\n" + + " \"M\",\n" + + " 6\n" + + " ],\n" + + " [\n" + + " \"Amber JOHnny\",\n" + + " \"M\",\n" + + " 1\n" + + " ]\n" + + " ],\n" + + " \"total\": 7,\n" + + " \"size\": 7\n" + + "}", + actual); + } + + @Test + public void testSortTwoFields() { + String actual = + execute( + String.format( + "source=%s | fields + firstname, gender, account_number | sort + gender, -" + + " account_number", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"firstname\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"account_number\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"Dillard\",\n" + + " \"F\",\n" + + " 32\n" + + " ],\n" + + " [\n" + + " \"Virginia\",\n" + + " \"F\",\n" + + " 25\n" + + " ],\n" + + " [\n" + + " \"Nanette\",\n" + + " \"F\",\n" + + " 13\n" + + " ],\n" + + " [\n" + + " \"Elinor\",\n" + + " \"M\",\n" + + " 20\n" + + " ],\n" + + " [\n" + + " \"Dale\",\n" + + " \"M\",\n" + + " 18\n" + + " ],\n" + + " [\n" + + " \"Hattie\",\n" + + " \"M\",\n" + + " 6\n" + + " ],\n" + + " [\n" + + " \"Amber JOHnny\",\n" + + " \"M\",\n" + + " 1\n" + + " ]\n" + + " ],\n" + + " \"total\": 7,\n" + + " \"size\": 7\n" + + "}", + actual); + } + + @Test + public void testSortWithDescAndLimit() { + String actual = + execute( + String.format( + "source=%s | fields + firstname, gender, account_number | sort + gender, -" + + " account_number | head 5", + TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"firstname\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"gender\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"account_number\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " \"Dillard\",\n" + + " \"F\",\n" + + " 32\n" + + " ],\n" + + " [\n" + + " \"Virginia\",\n" + + " \"F\",\n" + + " 25\n" + + " ],\n" + + " [\n" + + " \"Nanette\",\n" + + " \"F\",\n" + + " 13\n" + + " ],\n" + + " [\n" + + " \"Elinor\",\n" + + " \"M\",\n" + + " 20\n" + + " ],\n" + + " [\n" + + " \"Dale\",\n" + + " \"M\",\n" + + " 18\n" + + " ]\n" + + " ],\n" + + " \"total\": 5,\n" + + " \"size\": 5\n" + + "}", + actual); + } + + @Test + public void testMultipleTables() { + String actual = + execute( + String.format("source=%s, %s | stats count() as c", TEST_INDEX_BANK, TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"c\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 14\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } + + @Test + public void testMultipleTablesAndFilters() { + String actual = + execute( + String.format( + "source=%s, %s gender = 'F' | stats count() as c", + TEST_INDEX_BANK, TEST_INDEX_BANK)); + assertEquals( + "{\n" + + " \"schema\": [\n" + + " {\n" + + " \"name\": \"c\",\n" + + " \"type\": \"long\"\n" + + " }\n" + + " ],\n" + + " \"datarows\": [\n" + + " [\n" + + " 6\n" + + " ]\n" + + " ],\n" + + " \"total\": 1,\n" + + " \"size\": 1\n" + + "}", + actual); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java index d758614d1a7..8abd884049a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcUtil.java @@ -9,6 +9,8 @@ import java.sql.SQLException; import java.sql.Types; import lombok.experimental.UtilityClass; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.data.type.ExprCoreType; @@ -16,6 +18,7 @@ @UtilityClass public class JdbcUtil { + private static final Logger LOG = LogManager.getLogger(); public static ExprType getExprTypeFromSqlType(int sqlType) { switch (sqlType) { @@ -38,8 +41,10 @@ public static ExprType getExprTypeFromSqlType(int sqlType) { case Types.VARCHAR: case Types.CHAR: case Types.LONGVARCHAR: - default: return ExprCoreType.STRING; + default: + // TODO unchecked OpenSearchDataType + return ExprCoreType.UNKNOWN; } } @@ -79,6 +84,10 @@ public static ExprValue getExprValueFromSqlType(ResultSet rs, int i, int sqlType break; default: value = rs.getObject(i); + LOG.warn( + "Unchecked sql type: {}, return Object type {}", + sqlType, + value.getClass().getTypeName()); } return ExprValueUtils.fromObjectValue(value); } diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 27f7e4014ba..834549cfa61 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -288,8 +288,9 @@ expression // predicates logicalExpression - : comparisonExpression # comparsion - | NOT logicalExpression # logicalNot + : NOT logicalExpression # logicalNot + | LT_PRTHS logicalExpression RT_PRTHS # parentheticLogicalExpr + | comparisonExpression # comparsion | left = logicalExpression OR right = logicalExpression # logicalOr | left = logicalExpression (AND)? right = logicalExpression # logicalAnd | left = logicalExpression XOR right = logicalExpression # logicalXor diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 819c7814de1..697da32523f 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -154,6 +154,12 @@ public UnresolvedExpression visitParentheticValueExpr(ParentheticValueExprContex return visit(ctx.valueExpression()); // Discard parenthesis around } + @Override + public UnresolvedExpression visitParentheticLogicalExpr( + OpenSearchPPLParser.ParentheticLogicalExprContext ctx) { + return visit(ctx.logicalExpression()); // Discard parenthesis around + } + /** Field expression. */ @Override public UnresolvedExpression visitFieldExpression(FieldExpressionContext ctx) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java index e085b6b72e6..9ee4f81b9eb 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLBasicTest.java @@ -64,7 +64,7 @@ public void testFilterQuery() { verifyPPLToSparkSQL(root, expectedSparkSql); } - @Ignore + @Test public void testFilterQueryWithOr() { String ppl = "source=EMP | where (DEPTNO = 20 or MGR = 30) and SAL > 1000 | fields EMPNO, ENAME"; @@ -84,7 +84,7 @@ public void testFilterQueryWithOr() { verifyPPLToSparkSQL(root, expectedSparkSql); } - @Ignore + @Test public void testFilterQueryWithOr2() { String ppl = "source=EMP (DEPTNO = 20 or MGR = 30) and SAL > 1000 | fields EMPNO, ENAME"; RelNode root = getRelNode(ppl);