Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Reverse;
import org.opensearch.sql.ast.tree.Rex;
import org.opensearch.sql.ast.tree.SPath;
import org.opensearch.sql.ast.tree.Search;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
Expand All @@ -95,6 +96,7 @@
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
Expand Down Expand Up @@ -755,11 +757,21 @@ public LogicalPlan visitReverse(Reverse node, AnalysisContext context) {
throw getOnlyForCalciteException("Reverse");
}

@Override
public LogicalPlan visitSpath(SPath node, AnalysisContext context) {
throw getOnlyForCalciteException("Spath");
}

@Override
public LogicalPlan visitTimechart(Timechart node, AnalysisContext context) {
throw getOnlyForCalciteException("Timechart");
}

@Override
public LogicalPlan visitWindow(Window node, AnalysisContext context) {
throw getOnlyForCalciteException("Window");
}

@Override
public LogicalPlan visitRegex(Regex node, AnalysisContext context) {
throw getOnlyForCalciteException("Regex");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.sql.ast.expression.HighlightFunction;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.LambdaFunction;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
Expand Down Expand Up @@ -479,6 +480,11 @@ public Expression visitInSubquery(InSubquery node, AnalysisContext context) {
throw getOnlyForCalciteException("Subsearch");
}

@Override
public Expression visitLambdaFunction(LambdaFunction node, AnalysisContext context) {
throw getOnlyForCalciteException("Lambda function");
}

/**
* If QualifiedName is actually a reserved metadata field, return the expr type associated with
* the metadata field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.calcite;

import static org.apache.calcite.sql.SqlKind.AS;
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.INFORMATION_SCHEMA_NAME;
import static org.opensearch.sql.ast.tree.Join.JoinType.ANTI;
import static org.opensearch.sql.ast.tree.Join.JoinType.SEMI;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
Expand All @@ -20,6 +21,7 @@
import static org.opensearch.sql.calcite.utils.PlanUtils.getRelation;
import static org.opensearch.sql.calcite.utils.PlanUtils.getRexCall;
import static org.opensearch.sql.calcite.utils.PlanUtils.transformPlanToAttachChild;
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -65,6 +67,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.EmptySourcePropagateVisitor;
import org.opensearch.sql.ast.Node;
Expand Down Expand Up @@ -136,6 +139,7 @@
import org.opensearch.sql.calcite.utils.WildcardUtils;
import org.opensearch.sql.common.patterns.PatternUtils;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.CalciteUnsupportedException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
Expand All @@ -148,10 +152,12 @@ public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalciteP

private final CalciteRexNodeVisitor rexVisitor;
private final CalciteAggCallVisitor aggVisitor;
private final DataSourceService dataSourceService;

public CalciteRelNodeVisitor() {
public CalciteRelNodeVisitor(DataSourceService dataSourceService) {
this.rexVisitor = new CalciteRexNodeVisitor(this);
this.aggVisitor = new CalciteAggCallVisitor(rexVisitor);
this.dataSourceService = dataSourceService;
}

public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
Expand All @@ -160,6 +166,21 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {

@Override
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
DataSourceSchemaIdentifierNameResolver nameResolver =
new DataSourceSchemaIdentifierNameResolver(
dataSourceService, node.getTableQualifiedName().getParts());
if (!nameResolver
.getDataSourceName()
.equals(DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME)) {
throw new CalciteUnsupportedException(
"Datasource " + nameResolver.getDataSourceName() + " is unsupported in Calcite");
}
if (nameResolver.getIdentifierName().equals(DATASOURCES_TABLE_NAME)) {
throw new CalciteUnsupportedException("SHOW DATASOURCES is unsupported in Calcite");
}
if (nameResolver.getSchemaName().equals(INFORMATION_SCHEMA_NAME)) {
throw new CalciteUnsupportedException("information_schema is unsupported in Calcite");
}
context.relBuilder.scan(node.getTableQualifiedName().getParts());
return context.relBuilder.peek();
}
Expand Down Expand Up @@ -1364,7 +1385,7 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
throw new IllegalArgumentException("Number of duplicate events must be greater than 0");
}
if (consecutive) {
throw new UnsupportedOperationException("Consecutive deduplication is not supported");
throw new CalciteUnsupportedException("Consecutive deduplication is unsupported in Calcite");
}
// Columns to deduplicate
List<RexNode> dedupeFields =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.HighlightFunction;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.LambdaFunction;
Expand All @@ -56,6 +57,7 @@
import org.opensearch.sql.ast.expression.Or;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.RelevanceFieldList;
import org.opensearch.sql.ast.expression.ScoreFunction;
import org.opensearch.sql.ast.expression.Span;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.UnresolvedArgument;
Expand Down Expand Up @@ -649,6 +651,16 @@ public RexNode visitWhen(When node, CalcitePlanContext context) {
throw new CalciteUnsupportedException("CastWhen function is unsupported in Calcite");
}

@Override
public RexNode visitHighlightFunction(HighlightFunction node, CalcitePlanContext context) {
throw new CalciteUnsupportedException("Highlight function is unsupported in Calcite");
}

@Override
public RexNode visitScoreFunction(ScoreFunction node, CalcitePlanContext context) {
throw new CalciteUnsupportedException("Score function is unsupported in Calcite");
}

@Override
public RexNode visitRelevanceFieldList(RelevanceFieldList node, CalcitePlanContext context) {
List<RexNode> varArgRexNodeList = new ArrayList<>();
Expand Down
38 changes: 22 additions & 16 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -54,13 +55,12 @@ public class QueryService {
private final Analyzer analyzer;
private final ExecutionEngine executionEngine;
private final Planner planner;

@Getter(lazy = true)
private final CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor();

private DataSourceService dataSourceService;
private Settings settings;

@Getter(lazy = true)
private final CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor(dataSourceService);

/** Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.<br> */
public void execute(
UnresolvedPlan plan,
Expand Down Expand Up @@ -104,7 +104,7 @@ public void executeWithCalcite(
return null;
});
} catch (Throwable t) {
if (isCalciteFallbackAllowed() && !(t instanceof NonFallbackCalciteException)) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
Expand Down Expand Up @@ -140,7 +140,7 @@ public void explainWithCalcite(
return null;
});
} catch (Throwable t) {
if (isCalciteFallbackAllowed()) {
if (isCalciteFallbackAllowed(t)) {
log.warn("Fallback to V2 query engine since got exception", t);
explainWithLegacy(plan, queryType, listener, format, Optional.of(t));
} else {
Expand All @@ -162,7 +162,7 @@ public void executeWithLegacy(
try {
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
} catch (Exception e) {
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed()) {
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed(null)) {
// if there is a failure thrown from Calcite and execution after fallback V2
// keeps failure, we should throw the failure from Calcite.
calciteFailure.ifPresentOrElse(
Expand Down Expand Up @@ -195,7 +195,7 @@ public void explainWithLegacy(
}
executionEngine.explain(plan(analyze(plan, queryType)), listener);
} catch (Exception e) {
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed()) {
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed(null)) {
// if there is a failure thrown from Calcite and execution after fallback V2
// keeps failure, we should throw the failure from Calcite.
calciteFailure.ifPresentOrElse(
Expand Down Expand Up @@ -260,15 +260,21 @@ public RelNode optimize(RelNode plan, CalcitePlanContext context) {
SystemLimitType.QUERY_SIZE_LIMIT, plan, context.relBuilder.literal(context.querySizeLimit));
}

private boolean isCalciteFallbackAllowed() {
if (settings != null) {
Boolean fallback_allowed = settings.getSettingValue(Settings.Key.CALCITE_FALLBACK_ALLOWED);
if (fallback_allowed == null) {
return false;
}
return fallback_allowed;
} else {
private boolean isCalciteFallbackAllowed(@Nullable Throwable t) {
// We always allow fallback the query failed with CalciteUnsupportedException.
// This is for avoiding breaking changes when enable Calcite by default.
if (t instanceof CalciteUnsupportedException) {
Comment on lines +264 to +266
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a list of command or common cases that may trigger fallback? Just a little concern the fallback logic become more complicated (Calcite -> V2 -> legacy).

Copy link
Member Author

@LantaoJin LantaoJin Sep 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a list of command or common cases that may trigger fallback? Just a little concern the fallback logic become more complicated (Calcite -> V2 -> legacy).

Unsupported Functionalities in Calcite Engine

return true;
} else {
if (settings != null) {
Boolean fallback_allowed = settings.getSettingValue(Settings.Key.CALCITE_FALLBACK_ALLOWED);
if (fallback_allowed == null) {
return false;
}
return fallback_allowed;
} else {
return true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Search;
import org.opensearch.sql.datasource.DataSourceService;

/**
* Simple tests for CalciteRelNodeVisitor.visitSearch method. Tests basic functionality without
Expand All @@ -22,10 +24,11 @@
public class CalciteRelNodeVisitorSearchSimpleTest {

private CalciteRelNodeVisitor visitor;
@Mock DataSourceService dataSourceService;

@BeforeEach
public void setUp() {
visitor = new CalciteRelNodeVisitor();
visitor = new CalciteRelNodeVisitor(dataSourceService);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.sql.ast.expression.LambdaFunction;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.executor.QueryType;

@ExtendWith(MockitoExtension.class)
Expand All @@ -44,6 +45,7 @@ public class CalciteRexNodeVisitorTest {
@Mock RelDataType accType;
@Mock QualifiedName functionArg1;
@Mock QualifiedName functionArg2;
@Mock DataSourceService dataSourceService;

static CalciteRexNodeVisitor visitor;
static CalciteRelNodeVisitor relNodeVisitor;
Expand All @@ -57,7 +59,7 @@ public class CalciteRexNodeVisitorTest {

@BeforeEach
public void setUpContext() {
relNodeVisitor = new CalciteRelNodeVisitor();
relNodeVisitor = new CalciteRelNodeVisitor(dataSourceService);
visitor = new CalciteRexNodeVisitor(relNodeVisitor);
when(relBuilder.getRexBuilder()).thenReturn(rexBuilder);
when(rexBuilder.getTypeFactory()).thenReturn(TYPE_FACTORY);
Expand Down
1 change: 0 additions & 1 deletion docs/category.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
"user/ppl/cmd/dedup.rst",
"user/ppl/cmd/describe.rst",
"user/ppl/cmd/showdatasources.rst",
"user/ppl/cmd/information_schema.rst",
"user/ppl/cmd/eval.rst",
"user/ppl/cmd/fillnull.rst",
"user/ppl/cmd/grok.rst",
Expand Down
18 changes: 7 additions & 11 deletions docs/dev/intro-v3-engine.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# PPL Engine V3 (for 3.0.0)
# PPL Engine V3

---
## 1. Motivations
Expand Down Expand Up @@ -30,6 +30,8 @@ In the initial release of the V3 engine (3.0.0), the main new features focus on
* **[Lookup](../user/ppl/cmd/lookup.rst) Command**
* **[Subquery](../user/ppl/cmd/subquery.rst) Command**

V3 (Calcite integration) engine is enabled by default in 3.3.0.

---
## 3.What are Changed

Expand All @@ -51,7 +53,8 @@ Because of implementation changed internally, following behaviors are changed fr

### 3.2 Fallback Mechanism

As v3 engine is experimental in 3.0.0, not all PPL commands could work under this new engine. Those unsupported queries will be forwarded to V2 engine by fallback mechanism. To avoid impact on your side, normally you won't see any difference in a query response. If you want to check if and why your query falls back to be handled by V2 engine, please check OpenSearch log for "Fallback to V2 query engine since ...".
- As v3 engine is experimental in 3.0.0, not all PPL commands could work under this new engine. Those unsupported queries will be forwarded to V2 engine by fallback mechanism. To avoid impact on your side, normally you won't see any difference in a query response. If you want to check if and why your query falls back to be handled by V2 engine, please check OpenSearch log for "Fallback to V2 query engine since ...".
- Since 3.2.0, the fallback mechanism is disabled by default, to enable fallback, set `plugins.calcite.fallback.allowed=true`.

### 3.3 Limitations

Expand All @@ -64,22 +67,15 @@ For the following commands or functions, we add some defensive restrictions to e

For the following functionalities in V3 engine, the query will be forwarded to the V2 query engine and thus you cannot use new features in [2. What's New](#2-whats-new).

#### Unsupported functionalities
#### Unsupported functionalities (up to latest)
- All SQL queries
- PPL queries against non-OpenSearch data sources
- `dedup` with `consecutive=true`
- Search relevant commands
- AD
- ML
- Kmeans
- Commands with `fetch_size` parameter
- Search relevant functions
- match
- match_phrase
- match_bool_prefix
- match_phrase_prefix
- simple_query_string
- query_string
- multi_match
- [Existed limitations of V2](intro-v2-engine.md#33-limitations)

---
Expand Down
7 changes: 4 additions & 3 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,10 @@ Description

You can enable Calcite as new query optimizer and execution engine to all coming requests.

1. The default value is false since 3.0.0.
2. This setting is node scope.
3. This setting can be updated dynamically.
1. The default value is false in 3.0, 3.1 and 3.2.
2. The default value is true since 3.3.0.
3. This setting is node scope.
4. This setting can be updated dynamically.

Check `introduce v3 engine <../../../dev/intro-v3-engine.md>`_ for more details.
Check `join doc <../../ppl/cmd/join.rst>`_ for example.
Expand Down
4 changes: 2 additions & 2 deletions docs/user/dql/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1153,8 +1153,8 @@ Offset position can be given following the OFFSET keyword as well, here is an ex
+-----+


Limitation
----------
Limitations
-----------
Generally, sort plan is pushed down into the OpenSearch DSL in plan optimization, but note that if a query has complex sorting, like sort expression, which would not be pushed down during optimization (see `Optimizations <../optimization/optimization.rst>`_ for details), but computed in local memory. However, the engine fetches the index of a default size that is set in plugin setting (See `Settings <../admin/settings.rst>` plugins.query.size_limit for details). Therefore, the result might not be absolutely correct if the index size is larger than the default size of index scan. For example, the engine has a index scan size of 200 and the index size is 500. Then a query with limit 300 can only fetch 200 rows of the index, compute and return the sorted result with 200 rows, while the rest 300 rows of the index are ignored and would not be fetched into the engine. To get an absolutely correct result, it is suggested to set the query size limit to a larger value before run the query.


Loading
Loading