diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index c9b3839127d..7355d2f4bf5 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -7,21 +7,36 @@ import java.util.function.BiFunction; import lombok.Getter; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.RelBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.ast.expression.UnresolvedExpression; public class CalcitePlanContext { + public static class OSRelBuilder extends RelBuilder { + + protected OSRelBuilder( + @Nullable Context context, RelOptCluster cluster, @Nullable RelOptSchema relOptSchema) { + super(context, cluster, relOptSchema); + } + } + public FrameworkConfig config; + public CalciteConnection connection; public final RelBuilder relBuilder; public final ExtendedRexBuilder rexBuilder; @Getter private boolean isResolvingJoinCondition = false; - public CalcitePlanContext(FrameworkConfig config) { + public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) { this.config = config; + this.connection = connection; this.relBuilder = RelBuilder.create(config); this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder()); } @@ -35,7 +50,8 @@ public RexNode resolveJoinCondition( return result; } + // for testing only public static CalcitePlanContext create(FrameworkConfig config) { - return new CalcitePlanContext(config); + return new CalcitePlanContext(config, null); } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index bc92c89c638..208228628c8 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -24,7 +24,6 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilder.AggCall; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -63,10 +62,6 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) { @Override public RelNode visitRelation(Relation node, CalcitePlanContext context) { for (QualifiedName qualifiedName : node.getQualifiedNames()) { - SchemaPlus schema = context.config.getDefaultSchema(); - if (schema != null && schema.getName().equals(OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME)) { - schema.unwrap(OpenSearchSchema.class).registerTable(qualifiedName); - } context.relBuilder.scan(qualifiedName.getParts()); } if (node.getQualifiedNames().size() > 1) { diff --git a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java index 90aa6af7f51..642e84929e9 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java +++ b/core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java @@ -23,7 +23,16 @@ public class OpenSearchSchema extends AbstractSchema { private final DataSourceService dataSourceService; - private final Map tableMap = new HashMap<>(); + private final Map tableMap = + new HashMap<>() { + @Override + public Table get(Object key) { + if (!super.containsKey(key)) { + registerTable(new QualifiedName((String) key)); + } + return super.get(key); + } + }; public void registerTable(QualifiedName qualifiedName) { DataSourceSchemaIdentifierNameResolver nameResolver = diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java index 8461b62820d..592fb105f2f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTable.java @@ -6,6 +6,7 @@ package org.opensearch.sql.calcite.plan; import java.lang.reflect.Type; +import org.apache.calcite.adapter.java.AbstractQueryableTable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; @@ -15,16 +16,17 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.QueryableTable; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.TranslatableTable; -import org.apache.calcite.schema.impl.AbstractTable; import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes; -import org.opensearch.sql.data.model.ExprValue; -public abstract class OpenSearchTable extends AbstractTable - implements TranslatableTable, QueryableTable, org.opensearch.sql.storage.Table { +public abstract class OpenSearchTable extends AbstractQueryableTable + implements TranslatableTable, org.opensearch.sql.storage.Table { + + protected OpenSearchTable(Type elementType) { + super(elementType); + } @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { @@ -34,6 +36,8 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { final RelOptCluster cluster = context.getCluster(); + // return new LogicalTableScan(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), + // relOptTable); return new OpenSearchTableScan(cluster, relOptTable, this); } @@ -53,5 +57,5 @@ public Expression getExpression(SchemaPlus schema, String tableName, Class clazz return Schemas.tableExpression(schema, getElementType(), tableName, clazz); } - public abstract Enumerable search(); + public abstract Enumerable search(); } diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index abae853247c..baffc2e4db3 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -11,6 +11,10 @@ import java.util.List; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteJdbc41Factory; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.Driver; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.schema.SchemaPlus; @@ -60,8 +64,22 @@ public void execute( UnresolvedPlan plan, ResponseListener listener) { try { try { - final FrameworkConfig config = buildFrameworkConfig(); - final CalcitePlanContext context = new CalcitePlanContext(config); + // Use simple calcite schema since we don't compute tables in advance of the query. + CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false); + CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); + CalciteConnection connection = + factory.newConnection( + new Driver(), factory, "", new java.util.Properties(), rootSchema, null); + final SchemaPlus defaultSchema = + connection + .getRootSchema() + .add( + OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, + new OpenSearchSchema(dataSourceService)); + // Set opensearch schema as the default schema in config, otherwise we need to explicitly + // add schema path 'OpenSearch' before the opensearch table name + final FrameworkConfig config = buildFrameworkConfig(defaultSchema); + final CalcitePlanContext context = new CalcitePlanContext(config, connection); executePlanByCalcite(analyze(plan, context), context, listener); } catch (Exception e) { LOG.warn("Fallback to V2 query engine since got exception", e); @@ -134,14 +152,10 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) { return relNodeVisitor.analyze(plan, context); } - private FrameworkConfig buildFrameworkConfig() { - final SchemaPlus rootSchema = Frameworks.createRootSchema(true); - final SchemaPlus opensearchSchema = - rootSchema.add( - OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService)); + private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) { return Frameworks.newConfigBuilder() .parserConfig(SqlParser.Config.DEFAULT) // TODO check - .defaultSchema(opensearchSchema) + .defaultSchema(defaultSchema) .traitDefs((List) null) .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)) .build(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 08c5a5e7a2e..0fd65dd9325 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -5,23 +5,29 @@ package org.opensearch.sql.opensearch.executor; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.tools.RelRunners; +import org.apache.calcite.tools.RelRunner; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.ExecutionContext; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.ExecutionEngine.Schema.Column; import org.opensearch.sql.executor.Explain; import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -102,30 +108,48 @@ public ExplainResponseNode visitTableScan( @Override public void execute( RelNode rel, CalcitePlanContext context, ResponseListener listener) { - try (PreparedStatement statement = RelRunners.run(rel)) { + Connection connection = context.connection; + try { + RelRunner runner = connection.unwrap(RelRunner.class); + PreparedStatement statement = runner.prepareStatement(rel); ResultSet result = statement.executeQuery(); - printResultSet(result); + printResultSet(result, listener); } catch (SQLException e) { throw new RuntimeException(e); } } // for testing only - private void printResultSet(ResultSet resultSet) throws SQLException { + private void printResultSet(ResultSet resultSet, ResponseListener listener) + throws SQLException { // Get the ResultSet metadata to know about columns ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); + List values = new ArrayList<>(); // Iterate through the ResultSet while (resultSet.next()) { + Map row = new LinkedHashMap(); // Loop through each column for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); String value = resultSet.getString(i); System.out.println(columnName + ": " + value); + + row.put(columnName, new ExprStringValue(value)); } + values.add(ExprTupleValue.fromExprValueMap(row)); System.out.println("-------------------"); // Separator between rows } + + List columns = new ArrayList<>(metaData.getColumnCount()); + for (int i = 1; i <= columnCount; ++i) { + // TODO: mapping RelDataType to ExprType or deprecate ExprType + columns.add(new Column(metaData.getColumnName(i), null, ExprCoreType.STRING)); + } + Schema schema = new Schema(columns); + QueryResponse response = new QueryResponse(schema, values, null); + listener.onResponse(response); } private RelDataType makeStruct(RelDataTypeFactory typeFactory, RelDataType type) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index c6fda2be886..2ef95173a1f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -11,13 +11,14 @@ import java.util.Map; import java.util.function.Function; import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; +import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.calcite.plan.OpenSearchTable; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -79,6 +80,7 @@ public class OpenSearchIndex extends OpenSearchTable { /** Constructor. */ public OpenSearchIndex(OpenSearchClient client, Settings settings, String indexName) { + super(null); this.client = client; this.settings = settings; this.indexName = new OpenSearchRequest.IndexName(indexName); @@ -190,6 +192,17 @@ public boolean isFieldTypeTolerance() { return settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE); } + // @Override + public Enumerable scan(DataContext root) { + return new AbstractEnumerable<@Nullable Object[]>() { + @Override + public Enumerator<@Nullable Object[]> enumerator() { + return null; + // return search().toMap(v -> new Object[] {v}); + } + }; + } + @VisibleForTesting @RequiredArgsConstructor public static class OpenSearchDefaultImplementor extends DefaultImplementor { @@ -217,10 +230,10 @@ public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { } @Override - public Enumerable search() { - return new AbstractEnumerable() { + public Enumerable search() { + return new AbstractEnumerable() { @Override - public Enumerator enumerator() { + public Enumerator enumerator() { final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); final TimeValue cursorKeepAlive = diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 7a555e84abd..573bb7965cf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -15,7 +15,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; -public class OpenSearchIndexEnumerator implements Enumerator { +public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; @@ -50,9 +50,9 @@ private void fetchNextBatch() { } @Override - public ExprValue current() { + public Object current() { queryCount++; - return iterator.next(); + return iterator.next().tupleValue().values().stream().map(ExprValue::value).toArray(); } @Override