Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
import static org.opensearch.sql.data.type.ExprCoreType.DATE;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.data.type.ExprCoreType.TIME;
Expand Down Expand Up @@ -53,8 +54,10 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand Down Expand Up @@ -162,8 +165,8 @@ public LogicalPlan visitSubqueryAlias(SubqueryAlias node, AnalysisContext contex
STRUCT);
return child;
} else {
// TODO
throw new UnsupportedOperationException("SubqueryAlias is only supported in table alias");
throw new UnsupportedOperationException(
"Subsearch is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}
}

Expand Down Expand Up @@ -685,6 +688,18 @@ public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext con
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

@Override
public LogicalPlan visitJoin(Join node, AnalysisContext context) {
throw new UnsupportedOperationException(
"Join is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

@Override
public LogicalPlan visitLookup(Lookup node, AnalysisContext context) {
throw new UnsupportedOperationException(
"Lookup is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

private LogicalSort buildSort(
LogicalPlan child, AnalysisContext context, List<Field> sortFields) {
ExpressionReferenceOptimizer optimizer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.ast.dsl.AstDSL.and;
import static org.opensearch.sql.ast.dsl.AstDSL.compare;
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -49,6 +50,9 @@
import org.opensearch.sql.ast.expression.When;
import org.opensearch.sql.ast.expression.WindowFunction;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
Expand Down Expand Up @@ -406,6 +410,24 @@ public Expression visitArgument(Argument node, AnalysisContext context) {
return new NamedArgumentExpression(node.getArgName(), node.getValue().accept(this, context));
}

@Override
public Expression visitScalarSubquery(ScalarSubquery node, AnalysisContext context) {
throw new UnsupportedOperationException(
"Subsearch is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

@Override
public Expression visitExistsSubquery(ExistsSubquery node, AnalysisContext context) {
throw new UnsupportedOperationException(
"Subsearch is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

@Override
public Expression visitInSubquery(InSubquery node, AnalysisContext context) {
throw new UnsupportedOperationException(
"Subsearch is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
}

/**
* If QualifiedName is actually a reserved metadata field, return the expr type associated with
* the metadata field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void execute(
if (shouldUseCalcite(queryType)) {
executeWithCalcite(plan, queryType, listener);
} else {
executeWithLegacy(plan, queryType, listener);
executeWithLegacy(plan, queryType, listener, Optional.empty());
}
}

Expand All @@ -80,7 +81,7 @@ public void explain(
if (shouldUseCalcite(queryType)) {
explainWithCalcite(plan, queryType, listener, format);
} else {
explainWithLegacy(plan, queryType, listener, format);
explainWithLegacy(plan, queryType, listener, format, Optional.empty());
}
}

Expand All @@ -103,7 +104,7 @@ public void executeWithCalcite(
} catch (Throwable t) {
if (isCalciteFallbackAllowed()) {
log.warn("Fallback to V2 query engine since got exception", t);
executeWithLegacy(plan, queryType, listener);
executeWithLegacy(plan, queryType, listener, Optional.of(t));
} else {
if (t instanceof Error) {
// Calcite may throw AssertError during query execution.
Expand Down Expand Up @@ -135,7 +136,7 @@ public void explainWithCalcite(
} catch (Throwable t) {
if (isCalciteFallbackAllowed()) {
log.warn("Fallback to V2 query engine since got exception", t);
explainWithLegacy(plan, queryType, listener, format);
explainWithLegacy(plan, queryType, listener, format, Optional.of(t));
} else {
if (t instanceof Error) {
// Calcite may throw AssertError during query execution.
Expand All @@ -150,11 +151,19 @@ public void explainWithCalcite(
public void executeWithLegacy(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
ResponseListener<ExecutionEngine.QueryResponse> listener,
Optional<Throwable> calciteFailure) {
try {
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
} catch (Exception e) {
listener.onFailure(e);
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed()) {
// if there is a failure thrown from Calcite and execution after fallback V2
// keeps failure, we should throw the failure from Calcite.
calciteFailure.ifPresentOrElse(
t -> listener.onFailure(new RuntimeException(t)), () -> listener.onFailure(e));
} else {
listener.onFailure(e);
}
}
}

Expand All @@ -163,21 +172,31 @@ public void executeWithLegacy(
* explain response.
*
* @param plan {@link UnresolvedPlan}
* @param queryType {@link QueryType}
* @param listener {@link ResponseListener} for explain response
* @param calciteFailure Optional failure thrown from calcite
*/
public void explainWithLegacy(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
Explain.ExplainFormat format) {
Explain.ExplainFormat format,
Optional<Throwable> calciteFailure) {
try {
if (format != null && format != Explain.ExplainFormat.STANDARD) {
throw new UnsupportedOperationException(
"Explain mode " + format.name() + " is not supported in v2 engine");
}
executionEngine.explain(plan(analyze(plan, queryType)), listener);
} catch (Exception e) {
listener.onFailure(e);
if (shouldUseCalcite(queryType) && isCalciteFallbackAllowed()) {
// if there is a failure thrown from Calcite and execution after fallback V2
// keeps failure, we should throw the failure from Calcite.
calciteFailure.ifPresentOrElse(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have IT for this feature?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

t -> listener.onFailure(new RuntimeException(t)), () -> listener.onFailure(e));
} else {
listener.onFailure(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.remote;

import org.opensearch.sql.ppl.NewAddedCommandsIT;

public class CalciteNewAddedCommandsIT extends NewAddedCommandsIT {
@Override
public void init() throws Exception {
super.init();
enableCalcite();
disallowCalciteFallback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,25 @@ public void testXor() {
verifyDataRows(result, rows("Elinor", 36));
}

@Test
public void testKeepThrowCalciteException() throws IOException {
withFallbackEnabled(
() -> {
IllegalArgumentException e =
assertThrows(
IllegalArgumentException.class,
() ->
executeQuery(
String.format("source=%s | fields firstname1, age", TEST_INDEX_BANK)));
verifyErrorMessageContains(
e,
"field [firstname1] not found; input fields are: [account_number, firstname, address,"
+ " birthdate, gender, city, lastname, balance, employer, state, age, email,"
+ " male, _id, _index, _score, _maxscore, _sort, _routing]");
},
"");
}

@Test
public void testAliasDataType() {
JSONObject result =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ppl;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG;

import java.io.IOException;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.util.TestUtils;

public class NewAddedCommandsIT extends PPLIntegTestCase {
@Override
public void init() throws Exception {
super.init();
loadIndex(Index.BANK);
loadIndex(Index.DOG);
}

@Test
public void testJoin() throws IOException {
JSONObject result;
try {
result =
executeQuery(
String.format(
"search source=%s | join on firstname=holdersName %s",
TEST_INDEX_BANK, TEST_INDEX_DOG));
} catch (ResponseException e) {
result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
verifyQuery(result);
}
}

@Test
public void testLookup() throws IOException {
JSONObject result;
try {
result =
executeQuery(
String.format(
"search source=%s | lookup %s holdersName as firstname",
TEST_INDEX_BANK, TEST_INDEX_DOG));
} catch (ResponseException e) {
result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
}
verifyQuery(result);
}

@Test
public void testSubsearch() throws IOException {
JSONObject result;
try {
result =
executeQuery(
String.format(
"search source=[source=%s | where age>35 | fields age] as t", TEST_INDEX_BANK));
} catch (ResponseException e) {
result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
}
verifyQuery(result);

try {
result =
executeQuery(
String.format(
"search source=%s | where exists [ source=%s | where firstname=holdersName]",
TEST_INDEX_BANK, TEST_INDEX_DOG));
} catch (ResponseException e) {
result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
}
verifyQuery(result);

try {
result =
executeQuery(
String.format(
"search source=%s | where firstname in [ source=%s | fields holdersName]",
TEST_INDEX_BANK, TEST_INDEX_DOG));
} catch (ResponseException e) {
result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
}
verifyQuery(result);

try {
result =
executeQuery(
String.format(
"search source=%s | where firstname = [ source=%s | where holdersName='Hattie'"
+ " | fields holdersName | head 1]",
TEST_INDEX_BANK, TEST_INDEX_DOG));
} catch (ResponseException e) {
result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
}
verifyQuery(result);
}

private void verifyQuery(JSONObject result) throws IOException {
if (isCalciteEnabled()) {
assertFalse(result.getJSONArray("datarows").isEmpty());
} else {
JSONObject error = result.getJSONObject("error");
assertThat(
error.getString("details"),
containsString(
"is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true"));
assertThat(error.getString("type"), equalTo("UnsupportedOperationException"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void execute(
ResultSet result = statement.executeQuery();
buildResultSet(result, rel.getRowType(), listener);
} catch (SQLException e) {
listener.onFailure(e);
throw new RuntimeException(e);
}
return null;
}));
Expand Down
Loading