Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,15 @@ allprojects {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.9.10"
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10"
resolutionStrategy.force "net.bytebuddy:byte-buddy:1.14.9"
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.3.1"
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.5'
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.5'
resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
resolutionStrategy.force 'com.fasterxml.jackson:jackson-bom:2.17.2'
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5'
resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0'
resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Key {

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),

/** Query Settings. */
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),
Expand Down
5 changes: 2 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,14 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.datasource.model.DataSource',
'org.opensearch.sql.datasource.model.DataSourceStatus',
'org.opensearch.sql.datasource.model.DataSourceType',
'org.opensearch.sql.executor.ExecutionEngine'
]
limit {
counter = 'LINE'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
limit {
counter = 'BRANCH'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,29 @@

package org.opensearch.sql.calcite;

import java.sql.Connection;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;

public class CalcitePlanContext {

public FrameworkConfig config;
public final Connection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config) {
private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
this.config = config;
this.relBuilder = RelBuilder.create(config);
this.connection = CalciteToolsHelper.connect(config, typeFactory);
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

Expand All @@ -35,8 +40,11 @@ public RexNode resolveJoinCondition(
return result;
}

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config);
return new CalcitePlanContext(config, null);
}

public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
return new CalcitePlanContext(config, typeFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
@Override
public RelNode visitProject(Project node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> projectList =
node.getProjectList().stream()
.filter(expr -> !(expr instanceof AllFields))
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
if (projectList.isEmpty()) {
List<RexNode> projectList;
if (node.getProjectList().stream().anyMatch(e -> e instanceof AllFields)) {
return context.relBuilder.peek();
} else {
projectList =
node.getProjectList().stream()
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
}
if (node.isExcluded()) {
context.relBuilder.projectExcept(projectList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;

public abstract class OpenSearchTable extends AbstractQueryableTable
implements TranslatableTable, org.opensearch.sql.storage.Table {
Expand All @@ -27,7 +27,7 @@ protected OpenSearchTable(Type elementType) {

@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
return OpenSearchRelDataTypes.convertSchema(this);
return OpenSearchTypeFactory.convertSchema(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add calcite's license header after ours since we copy code from that repo.

There is similar example in

*/

package org.opensearch.sql.calcite.utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteFactory;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.Util;
import org.opensearch.sql.calcite.CalcitePlanContext;

/**
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
* 3. RelBuilder 4. RelRunner TODO delete it in future if possible.
*/
public class CalciteToolsHelper {

/** Create a RelBuilder with testing */
public static RelBuilder create(FrameworkConfig config) {
return RelBuilder.create(config);
}

/** Create a RelBuilder with typeFactory */
public static RelBuilder create(
FrameworkConfig config, JavaTypeFactory typeFactory, Connection connection) {
return withPrepare(
config,
typeFactory,
connection,
(cluster, relOptSchema, rootSchema, statement) ->
new OpenSearchRelBuilder(config.getContext(), cluster, relOptSchema));
}

public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFactory) {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
try {
return new OpenSearchDriver().connect("jdbc:calcite:", info, null, typeFactory);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

/**
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
* Frameworks.BasePrepareAction)}. The purpose is the method {@link
* CalciteFactory#newConnection(UnregisteredDriver, AvaticaFactory, String, Properties)} create
* connection with null instance of JavaTypeFactory. So we add a parameter JavaTypeFactory.
*/
private static <R> R withPrepare(
FrameworkConfig config,
JavaTypeFactory typeFactory,
Connection connection,
Frameworks.BasePrepareAction<R> action) {
try {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
final CalciteServerStatement statement =
connection.createStatement().unwrap(CalciteServerStatement.class);
return new OpenSearchPrepareImpl().perform(statement, config, typeFactory, action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static class OpenSearchDriver extends Driver {

public Connection connect(
String url, Properties info, CalciteSchema rootSchema, JavaTypeFactory typeFactory)
throws SQLException {
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
AvaticaConnection connection =
factory.newConnection((Driver) this, factory, url, info, rootSchema, typeFactory);
this.handler.onConnectionInit(connection);
return connection;
}
}

/** do nothing, just extend for a public construct for new */
public static class OpenSearchRelBuilder extends RelBuilder {
public OpenSearchRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);
}
}

public static class OpenSearchPrepareImpl extends CalcitePrepareImpl {
/**
* Similar to {@link CalcitePrepareImpl#perform(CalciteServerStatement, FrameworkConfig,
* Frameworks.BasePrepareAction)}, but with a custom typeFactory.
*/
public <R> R perform(
CalciteServerStatement statement,
FrameworkConfig config,
JavaTypeFactory typeFactory,
Frameworks.BasePrepareAction<R> action) {
final CalcitePrepare.Context prepareContext = statement.createPrepareContext();
SchemaPlus defaultSchema = config.getDefaultSchema();
final CalciteSchema schema =
defaultSchema != null
? CalciteSchema.from(defaultSchema)
: prepareContext.getRootSchema();
CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
schema.root(), schema.path(null), typeFactory, prepareContext.config());
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
final RelOptPlanner planner =
createPlanner(prepareContext, config.getContext(), config.getCostFactory());
final RelOptCluster cluster = createCluster(planner, rexBuilder);
return action.apply(cluster, catalogReader, prepareContext.getRootSchema().plus(), statement);
}
}

public static class OpenSearchRelRunners {
/**
* Runs a relational expression by existing connection. This class copied from {@link
* org.apache.calcite.tools.RelRunners#run(RelNode)}
*/
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
final RelShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan
&& Bindables.BindableTableScan.canHandle(table)) {
// Always replace the LogicalTableScan with BindableTableScan
// because it's implementation does not require a "schema" as context.
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
rel = rel.accept(shuttle);
// the line we changed here
try (Connection connection = context.connection) {
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(rel);
} catch (SQLException e) {
throw Util.throwAsRuntime(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.calcite.utils;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -16,11 +17,12 @@
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.storage.Table;

public class OpenSearchRelDataTypes extends JavaTypeFactoryImpl {
public static final OpenSearchRelDataTypes TYPE_FACTORY =
new OpenSearchRelDataTypes(RelDataTypeSystem.DEFAULT);
/** This class is used to create RelDataType and map RelDataType to Java data type */
public class OpenSearchTypeFactory extends JavaTypeFactoryImpl {
public static final OpenSearchTypeFactory TYPE_FACTORY =
new OpenSearchTypeFactory(RelDataTypeSystem.DEFAULT);

private OpenSearchRelDataTypes(RelDataTypeSystem typeSystem) {
private OpenSearchTypeFactory(RelDataTypeSystem typeSystem) {
super(typeSystem);
}

Expand Down Expand Up @@ -108,8 +110,14 @@ public static RelDataType convertSchema(Table table) {
List<RelDataType> typeList = new ArrayList<>();
for (Map.Entry<String, ExprType> entry : table.getFieldTypes().entrySet()) {
fieldNameList.add(entry.getKey());
typeList.add(OpenSearchRelDataTypes.convertSchemaField(entry.getValue()));
typeList.add(OpenSearchTypeFactory.convertSchemaField(entry.getValue()));
}
return TYPE_FACTORY.createStructType(typeList, fieldNameList, true);
}

/** not in use for now, but let's keep this code for future reference. */
@Override
public Type getJavaClass(RelDataType type) {
return super.getJavaClass(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ public boolean equals(Object o) {

/** The expression value equal. */
public abstract boolean equal(ExprValue other);

@Override
public Object valueForCalcite() {
return value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -41,6 +42,11 @@ public String value() {
return DateTimeFormatter.ISO_LOCAL_DATE.format(date);
}

@Override
public Long valueForCalcite() {
return date.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

@Override
public ExprType type() {
return ExprCoreType.DATE;
Expand Down
Loading
Loading