From 89861b5c03f297986211c6a1145c51e50906086b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 16 Jun 2025 15:56:21 -0700 Subject: [PATCH 1/6] Add api module with API and UT Signed-off-by: Chen Dai --- api/build.gradle | 76 ++++++++ .../sql/api/UnifiedQueryPlanner.java | 168 ++++++++++++++++++ .../sql/api/UnifiedQueryPlannerTest.java | 71 ++++++++ build.gradle | 2 +- settings.gradle | 1 + 5 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 api/build.gradle create mode 100644 api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java create mode 100644 api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java diff --git a/api/build.gradle b/api/build.gradle new file mode 100644 index 00000000000..4d426d8fe31 --- /dev/null +++ b/api/build.gradle @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id 'jacoco' + id 'com.diffplug.spotless' version '6.22.0' +} + +dependencies { + api project(':ppl') + + testImplementation group: 'junit', name: 'junit', version: '4.13.2' + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}" + testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}" + testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.38.0' + // testImplementation(testFixtures(project(":core"))) +} + +spotless { + java { + target fileTree('.') { + include '**/*.java' + exclude '**/build/**', '**/build-*/**', 'src/main/gen/**' + } + importOrder() +// licenseHeader("/*\n" + +// " * Copyright OpenSearch Contributors\n" + +// " * SPDX-License-Identifier: Apache-2.0\n" + +// " */\n\n") + removeUnusedImports() + trimTrailingWhitespace() + endWithNewline() + googleJavaFormat('1.17.0').reflowLongStrings().groupArtifact('com.google.googlejavaformat:google-java-format') + } +} + +test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} + +jacocoTestReport { + reports { + html.required = true + xml.required = true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, + exclude: ['**/antlr/parser/**']) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) +jacocoTestCoverageVerification { + violationRules { + rule { + limit { + minimum = 0.9 + } + + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, + exclude: ['**/antlr/parser/**']) + })) + } +} +check.dependsOn jacocoTestCoverageVerification diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java new file mode 100644 index 00000000000..b4802775870 --- /dev/null +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.antlr.v4.runtime.tree.ParseTree; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.opensearch.sql.ast.statement.Query; +import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.CalciteRelNodeVisitor; +import org.opensearch.sql.common.antlr.Parser; +import org.opensearch.sql.executor.OpenSearchTypeSystem; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; +import org.opensearch.sql.ppl.parser.AstBuilder; +import org.opensearch.sql.ppl.parser.AstStatementBuilder; + +/** + * {@code UnifiedQueryPlanner} provides a high-level API for parsing and analyzing queries using the + * Calcite-based query engine. It serves as the primary integration point for external consumers + * such as Spark or command-line tools, abstracting away Calcite internals. + */ +public class UnifiedQueryPlanner { + /** The type of query language being used (e.g., PPL). */ + private final QueryType queryType; + + /** The parser instance responsible for converting query text into a parse tree. */ + private final Parser parser; + + /** Calcite framework configuration used during logical plan construction. */ + private final FrameworkConfig config; + + /** AST-to-RelNode visitor that builds logical plans from the parsed AST. */ + private final CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor(); + + /** + * Constructs a UnifiedQueryPlanner for a given query type and schema root. + * + * @param queryType the query language type (e.g., PPL) + * @param rootSchema the root Calcite schema containing all catalogs and tables + */ + public UnifiedQueryPlanner(QueryType queryType, SchemaPlus rootSchema) { + this.queryType = queryType; + this.parser = buildQueryParser(queryType); + this.config = buildCalciteConfig(rootSchema); + } + + /** + * Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate + * optimal physical plan to fully unify query execution and leverage Calcite's optimzer. + * + * @param query the raw query string in PPL or other supported syntax + * @return a logical plan representing the query + */ + public RelNode plan(String query) { + // Parse query and build AST + ParseTree cst = parser.parse(query); + AstStatementBuilder astStmtBuilder = + new AstStatementBuilder( + new AstBuilder(query), AstStatementBuilder.StatementBuilderContext.builder().build()); + Statement statement = cst.accept(astStmtBuilder); + UnresolvedPlan ast = ((Query) statement).getPlan(); + + // Analyze the ASt and convert to Calcite logical plan + CalcitePlanContext calcitePlanContext = CalcitePlanContext.create(config, 100, queryType); + RelNode logical = relNodeVisitor.analyze(ast, calcitePlanContext); + + // Add redundant sort if necessary to preserve collation. + RelNode calcitePlan = logical; + RelCollation collation = logical.getTraitSet().getCollation(); + if (!(logical instanceof Sort) && collation != RelCollations.EMPTY) { + calcitePlan = LogicalSort.create(logical, collation, null, null); + } + return calcitePlan; + } + + private Parser buildQueryParser(QueryType queryType) { + if (queryType == QueryType.PPL) { + return new PPLSyntaxParser(); + } + throw new UnsupportedOperationException("Unsupported query type: " + queryType); + } + + private FrameworkConfig buildCalciteConfig(SchemaPlus defaultSchema) { + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) // TODO check + .defaultSchema(defaultSchema) + .traitDefs((List) null) + .programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE)) + .typeSystem(OpenSearchTypeSystem.INSTANCE) + .build(); + } + + /** Builder for {@link UnifiedQueryPlanner}, supporting declarative fluent API. */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link UnifiedQueryPlanner}, supporting both declarative and dynamic schema + * registration for use in query planning. + */ + public static class Builder { + private QueryType queryType; + private SchemaPlus rootSchema; + + public Builder() { + this.rootSchema = CalciteSchema.createRootSchema(true, false).plus(); + } + + /** + * Sets the query language frontend to be used by the planner. + * + * @param queryType the {@link QueryType}, such as PPL + * @return this builder instance + */ + public Builder language(QueryType queryType) { + this.queryType = queryType; + return this; + } + + /** + * Registers a catalog and its databases. + * + * @param catalogName the name of the catalog + * @param databases a map of database name → schema + * @return this builder instance + */ + public Builder catalog(String catalogName, Map databases) { + SchemaPlus catalog = rootSchema.add(catalogName, new AbstractSchema()); + for (Map.Entry e : databases.entrySet()) { + catalog.add(e.getKey(), e.getValue()); + } + return this; + } + + /** + * Builds a {@link UnifiedQueryPlanner} with the configuration. + * + * @return a new instance of {@link UnifiedQueryPlanner} + */ + public UnifiedQueryPlanner build() { + Objects.requireNonNull(queryType, "Must specify language before build"); + return new UnifiedQueryPlanner(queryType, rootSchema); + } + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java new file mode 100644 index 00000000000..9a1e39d6883 --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.junit.Assert.assertNotNull; + +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; +import org.opensearch.sql.executor.QueryType; + +public class UnifiedQueryPlannerTest { + + /** Test database with a test table with id and name columns */ + private AbstractSchema testDatabase = + new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of( + "test", + new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + final RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER); + final RelDataType stringType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + return typeFactory.createStructType( + List.of(intType, stringType), List.of("id", "name")); + } + }); + } + }; + + @Test + public void testSimpleQuery() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", Map.of("default", testDatabase)) + .build(); + + RelNode plan = planner.plan("source = opensearch.default.test | eval f = abs(123)"); + assertNotNull("Plan should not be null", plan); + } + + @Test + public void testJoinQuery() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", Map.of("default", testDatabase)) + .catalog("spark_catalog", Map.of("default", testDatabase)) + .build(); + + RelNode plan = + planner.plan( + "source = opensearch.default.test |" + + "lookup spark_catalog.default.test id |" + + "eval f = abs(123)"); + assertNotNull("Plan should not be null", plan); + } +} diff --git a/build.gradle b/build.gradle index e57e3522121..c537bab8a6f 100644 --- a/build.gradle +++ b/build.gradle @@ -162,7 +162,7 @@ subprojects { } // Publish internal modules as Maven artifacts for external use, such as by opensearch-spark and opensearch-cli. - def publishedModules = ['sql', 'ppl', 'core', 'opensearch', 'common', 'protocol'] + def publishedModules = ['api', 'sql', 'ppl', 'core', 'opensearch', 'common', 'protocol'] if (publishedModules.contains(name)) { apply plugin: 'java-library' apply plugin: 'maven-publish' diff --git a/settings.gradle b/settings.gradle index fc4bc43f463..9b7284aa965 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,7 @@ include 'datasources' include 'async-query-core' include 'async-query' include 'language-grammar' +include 'api' // exclude integ-test/doctest in case of offline build since they need downloads if (!gradle.startParameter.offline) { From 197f7d9bde1f5ab0966708a37a829a9db1203b5b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Jun 2025 14:52:57 -0700 Subject: [PATCH 2/6] Refactor catalog API and clean up build.gradle Signed-off-by: Chen Dai --- api/build.gradle | 5 -- .../sql/api/UnifiedQueryPlanner.java | 60 ++++++++++--------- .../sql/api/UnifiedQueryPlannerTest.java | 26 ++++---- 3 files changed, 45 insertions(+), 46 deletions(-) diff --git a/api/build.gradle b/api/build.gradle index 4d426d8fe31..a87b4f03cbf 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -16,7 +16,6 @@ dependencies { testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}" testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}" testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.38.0' - // testImplementation(testFixtures(project(":core"))) } spotless { @@ -26,10 +25,6 @@ spotless { exclude '**/build/**', '**/build-*/**', 'src/main/gen/**' } importOrder() -// licenseHeader("/*\n" + -// " * Copyright OpenSearch Contributors\n" + -// " * SPDX-License-Identifier: Apache-2.0\n" + -// " */\n\n") removeUnusedImports() trimTrailingWhitespace() endWithNewline() diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java index b4802775870..093ae9a30c5 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -6,7 +6,6 @@ package org.opensearch.sql.api; import java.util.List; -import java.util.Map; import java.util.Objects; import org.antlr.v4.runtime.tree.ParseTree; import org.apache.calcite.jdbc.CalciteSchema; @@ -19,7 +18,6 @@ import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; @@ -74,25 +72,7 @@ public UnifiedQueryPlanner(QueryType queryType, SchemaPlus rootSchema) { * @return a logical plan representing the query */ public RelNode plan(String query) { - // Parse query and build AST - ParseTree cst = parser.parse(query); - AstStatementBuilder astStmtBuilder = - new AstStatementBuilder( - new AstBuilder(query), AstStatementBuilder.StatementBuilderContext.builder().build()); - Statement statement = cst.accept(astStmtBuilder); - UnresolvedPlan ast = ((Query) statement).getPlan(); - - // Analyze the ASt and convert to Calcite logical plan - CalcitePlanContext calcitePlanContext = CalcitePlanContext.create(config, 100, queryType); - RelNode logical = relNodeVisitor.analyze(ast, calcitePlanContext); - - // Add redundant sort if necessary to preserve collation. - RelNode calcitePlan = logical; - RelCollation collation = logical.getTraitSet().getCollation(); - if (!(logical instanceof Sort) && collation != RelCollations.EMPTY) { - calcitePlan = LogicalSort.create(logical, collation, null, null); - } - return calcitePlan; + return preserveCollation(analyze(parse(query))); } private Parser buildQueryParser(QueryType queryType) { @@ -112,6 +92,29 @@ private FrameworkConfig buildCalciteConfig(SchemaPlus defaultSchema) { .build(); } + private UnresolvedPlan parse(String query) { + ParseTree cst = parser.parse(query); + AstStatementBuilder astStmtBuilder = + new AstStatementBuilder( + new AstBuilder(query), AstStatementBuilder.StatementBuilderContext.builder().build()); + Statement statement = cst.accept(astStmtBuilder); + return ((Query) statement).getPlan(); + } + + private RelNode analyze(UnresolvedPlan ast) { + CalcitePlanContext calcitePlanContext = CalcitePlanContext.create(config, 100, queryType); + return relNodeVisitor.analyze(ast, calcitePlanContext); + } + + private RelNode preserveCollation(RelNode logical) { + RelNode calcitePlan = logical; + RelCollation collation = logical.getTraitSet().getCollation(); + if (!(logical instanceof Sort) && collation != RelCollations.EMPTY) { + calcitePlan = LogicalSort.create(logical, collation, null, null); + } + return calcitePlan; + } + /** Builder for {@link UnifiedQueryPlanner}, supporting declarative fluent API. */ public static Builder builder() { return new Builder(); @@ -141,17 +144,16 @@ public Builder language(QueryType queryType) { } /** - * Registers a catalog and its databases. + * Registers a catalog with the specified name and its associated schema. The schema can be a + * flat or nested structure (e.g., catalog → schema → table), depending on how data is + * organized. * - * @param catalogName the name of the catalog - * @param databases a map of database name → schema + * @param name the name of the catalog to register + * @param schema the schema representing the structure of the catalog * @return this builder instance */ - public Builder catalog(String catalogName, Map databases) { - SchemaPlus catalog = rootSchema.add(catalogName, new AbstractSchema()); - for (Map.Entry e : databases.entrySet()) { - catalog.add(e.getKey(), e.getValue()); - } + public Builder catalog(String name, Schema schema) { + rootSchema.add(name, schema); return this; } diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java index 9a1e39d6883..d8d85d34154 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertNotNull; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelNode; @@ -22,13 +23,14 @@ public class UnifiedQueryPlannerTest { /** Test database with a test table with id and name columns */ - private AbstractSchema testDatabase = + private AbstractSchema testSchema = new AbstractSchema() { @Override protected Map getTableMap() { - return Map.of( - "test", - new AbstractTable() { + return new HashMap<>() { + @Override + public Table get(Object key) { + return new AbstractTable() { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { final RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER); @@ -36,7 +38,9 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { return typeFactory.createStructType( List.of(intType, stringType), List.of("id", "name")); } - }); + }; + } + }; } }; @@ -45,10 +49,10 @@ public void testSimpleQuery() { UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) - .catalog("opensearch", Map.of("default", testDatabase)) + .catalog("opensearch", testSchema) .build(); - RelNode plan = planner.plan("source = opensearch.default.test | eval f = abs(123)"); + RelNode plan = planner.plan("source = opensearch.test | eval f = abs(id)"); assertNotNull("Plan should not be null", plan); } @@ -57,15 +61,13 @@ public void testJoinQuery() { UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) - .catalog("opensearch", Map.of("default", testDatabase)) - .catalog("spark_catalog", Map.of("default", testDatabase)) + .catalog("opensearch", testSchema) + .catalog("spark_catalog", testSchema) .build(); RelNode plan = planner.plan( - "source = opensearch.default.test |" - + "lookup spark_catalog.default.test id |" - + "eval f = abs(123)"); + "source = opensearch.test |" + "lookup spark_catalog.test id |" + "eval f = abs(id)"); assertNotNull("Plan should not be null", plan); } } From 89d9394b920c4ff506dd35ef65291bf279ca3613 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Jun 2025 15:12:40 -0700 Subject: [PATCH 3/6] Add cache schema API and refactor UT Signed-off-by: Chen Dai --- .../sql/api/UnifiedQueryPlanner.java | 26 ++++++-- .../sql/api/UnifiedQueryPlannerTest.java | 64 +++++++++++++------ 2 files changed, 62 insertions(+), 28 deletions(-) diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java index 093ae9a30c5..8b824af6dc2 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -5,7 +5,9 @@ package org.opensearch.sql.api; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import org.antlr.v4.runtime.tree.ParseTree; import org.apache.calcite.jdbc.CalciteSchema; @@ -79,7 +81,7 @@ private Parser buildQueryParser(QueryType queryType) { if (queryType == QueryType.PPL) { return new PPLSyntaxParser(); } - throw new UnsupportedOperationException("Unsupported query type: " + queryType); + throw new IllegalArgumentException("Unsupported query type: " + queryType); } private FrameworkConfig buildCalciteConfig(SchemaPlus defaultSchema) { @@ -125,12 +127,9 @@ public static Builder builder() { * registration for use in query planning. */ public static class Builder { + private final Map catalogs = new HashMap<>(); private QueryType queryType; - private SchemaPlus rootSchema; - - public Builder() { - this.rootSchema = CalciteSchema.createRootSchema(true, false).plus(); - } + private boolean cacheSchema; /** * Sets the query language frontend to be used by the planner. @@ -153,7 +152,18 @@ public Builder language(QueryType queryType) { * @return this builder instance */ public Builder catalog(String name, Schema schema) { - rootSchema.add(name, schema); + catalogs.put(name, schema); + return this; + } + + /** + * Enables or disables schema caching in the root schema. + * + * @param cache whether to enable schema caching + * @return this builder instance + */ + public Builder cacheSchema(boolean cache) { + this.cacheSchema = cache; return this; } @@ -164,6 +174,8 @@ public Builder catalog(String name, Schema schema) { */ public UnifiedQueryPlanner build() { Objects.requireNonNull(queryType, "Must specify language before build"); + SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheSchema).plus(); + catalogs.forEach(rootSchema::add); return new UnifiedQueryPlanner(queryType, rootSchema); } } diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java index d8d85d34154..422aff377de 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java @@ -7,7 +7,6 @@ import static org.junit.Assert.assertNotNull; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelNode; @@ -22,30 +21,28 @@ public class UnifiedQueryPlannerTest { - /** Test database with a test table with id and name columns */ - private AbstractSchema testSchema = + /** Test schema consists of a test table with id and name columns */ + private final AbstractSchema testSchema = new AbstractSchema() { @Override protected Map getTableMap() { - return new HashMap<>() { - @Override - public Table get(Object key) { - return new AbstractTable() { + return Map.of( + "test", + new AbstractTable() { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - final RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER); - final RelDataType stringType = typeFactory.createSqlType(SqlTypeName.VARCHAR); return typeFactory.createStructType( - List.of(intType, stringType), List.of("id", "name")); + List.of( + typeFactory.createSqlType(SqlTypeName.INTEGER), + typeFactory.createSqlType(SqlTypeName.VARCHAR)), + List.of("id", "name")); } - }; - } - }; + }); } }; @Test - public void testSimpleQuery() { + public void testPPLQueryPlanning() { UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) @@ -53,21 +50,46 @@ public void testSimpleQuery() { .build(); RelNode plan = planner.plan("source = opensearch.test | eval f = abs(id)"); - assertNotNull("Plan should not be null", plan); + assertNotNull("Plan should be created", plan); } @Test - public void testJoinQuery() { + public void testPPLQueryPlanningWithMultipleCatalogs() { UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) - .catalog("opensearch", testSchema) - .catalog("spark_catalog", testSchema) + .catalog("catalog1", testSchema) + .catalog("catalog2", testSchema) .build(); RelNode plan = - planner.plan( - "source = opensearch.test |" + "lookup spark_catalog.test id |" + "eval f = abs(id)"); - assertNotNull("Plan should not be null", plan); + planner.plan("source = catalog1.test | lookup catalog2.test id | eval f = abs(id)"); + assertNotNull("Plan should be created with multiple catalogs", plan); + } + + @Test + public void testSchemaCaching() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", testSchema) + .cacheSchema(true) + .build(); + + RelNode plan = planner.plan("source = opensearch.test | eval f = abs(id)"); + assertNotNull("Planner should work with caching enabled", plan); + } + + @Test(expected = NullPointerException.class) + public void testMissingQueryLanguage() { + UnifiedQueryPlanner.builder().catalog("opensearch", testSchema).build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testUnsupportedQueryLanguage() { + UnifiedQueryPlanner.builder() + .language(QueryType.SQL) // only PPL is supported + .catalog("opensearch", testSchema) + .build(); } } From 1308441da2e269a4b6c1b4893aae4d0b653b43ff Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Jun 2025 15:40:20 -0700 Subject: [PATCH 4/6] Add readme Signed-off-by: Chen Dai --- api/README.md | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 api/README.md diff --git a/api/README.md b/api/README.md new file mode 100644 index 00000000000..e13258e7946 --- /dev/null +++ b/api/README.md @@ -0,0 +1,74 @@ +# Unified Query API + +This module provides a high-level integration layer for the Calcite-based query engine, enabling external systems such as Apache Spark or command-line tools to parse and analyze queries without exposing low-level internals. + +## Overview + +The `UnifiedQueryPlanner` serves as the primary entry point for external consumers. It accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation. + +## Usage + +Use the declarative, fluent builder API to initialize the `UnifiedQueryPlanner`. + +```java +UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", schema) + .cacheSchema(true) + .build(); + +RelNode plan = planner.plan("source = opensearch.test"); +``` + +## Development & Testing + +A set of unit tests is provided to validate planner behavior. + +To run tests: + +``` +./gradlew :api:test +``` + +## Integration Guide + +This guide walks through how to integrate unified query planner into your application. + +### Step 1: Add Dependency + +The module is currently published as a snapshot to the AWS Sonatype Snapshots repository. To include it as a dependency in your project, add the following to your `pom.xml` or `build.gradle`: + +```xml + + org.opensearch.query + unified-query-api + YOUR_VERSION_HERE + +``` + +### Step 2: Implement a Calcite Schema + +You must implement the Calcite `Schema` interface and register them using the fluent `catalog()` method on the builder. + +```java +public class MySchema extends AbstractSchema { + @Override + protected Map getTableMap() { + return Map.of( + "test_table", + new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory.createStructType( + List.of(typeFactory.createSqlType(SqlTypeName.INTEGER)), + List.of("id")); + } + }); + } +} +``` + +## Future Work + +- Expand support to SQL language. +- Extend planner to generate optimized physical plans using Calcite's optimization frameworks. From f124aaa06067f97b44b94550e9f9dd93cf7dfaf2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Jun 2025 16:44:57 -0700 Subject: [PATCH 5/6] Add comment for hardcoding query size limit Signed-off-by: Chen Dai --- .../main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java | 3 ++- .../java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java | 4 ++-- settings.gradle | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java index 8b824af6dc2..70cc2d56d04 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -104,7 +104,8 @@ private UnresolvedPlan parse(String query) { } private RelNode analyze(UnresolvedPlan ast) { - CalcitePlanContext calcitePlanContext = CalcitePlanContext.create(config, 100, queryType); + // TODO: Hardcoded query size limit (10000) for now as only logical plan is generated. + CalcitePlanContext calcitePlanContext = CalcitePlanContext.create(config, 10000, queryType); return relNodeVisitor.analyze(ast, calcitePlanContext); } diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java index 422aff377de..14012445f60 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java @@ -68,7 +68,7 @@ public void testPPLQueryPlanningWithMultipleCatalogs() { } @Test - public void testSchemaCaching() { + public void testPPLQueryPlanningWithSchemaCaching() { UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) @@ -77,7 +77,7 @@ public void testSchemaCaching() { .build(); RelNode plan = planner.plan("source = opensearch.test | eval f = abs(id)"); - assertNotNull("Planner should work with caching enabled", plan); + assertNotNull("Planner should work with schema caching enabled", plan); } @Test(expected = NullPointerException.class) diff --git a/settings.gradle b/settings.gradle index 9b7284aa965..bbb0ecf50a5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ rootProject.name = 'opensearch-sql' include 'opensearch-sql-plugin' project(':opensearch-sql-plugin').projectDir = file('plugin') +include 'api' include 'ppl' include 'common' include 'opensearch' @@ -21,7 +22,6 @@ include 'datasources' include 'async-query-core' include 'async-query' include 'language-grammar' -include 'api' // exclude integ-test/doctest in case of offline build since they need downloads if (!gradle.startParameter.offline) { From a55913a4387074df14a827512336794cdd433887 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 23 Jun 2025 14:21:34 -0700 Subject: [PATCH 6/6] Add default namespace API with more UTs Signed-off-by: Chen Dai --- api/README.md | 3 +- .../sql/api/UnifiedQueryPlanner.java | 73 ++++++++++--- .../sql/api/UnifiedQueryPlannerTest.java | 103 ++++++++++++++++-- 3 files changed, 154 insertions(+), 25 deletions(-) diff --git a/api/README.md b/api/README.md index e13258e7946..0288b7ad22c 100644 --- a/api/README.md +++ b/api/README.md @@ -14,7 +14,8 @@ Use the declarative, fluent builder API to initialize the `UnifiedQueryPlanner`. UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) .catalog("opensearch", schema) - .cacheSchema(true) + .defaultNamespace("opensearch") + .cacheMetadata(true) .build(); RelNode plan = planner.plan("source = opensearch.test"); diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java index 70cc2d56d04..6300ddb358c 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -30,7 +30,7 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.common.antlr.Parser; -import org.opensearch.sql.executor.OpenSearchTypeSystem; +import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.opensearch.sql.ppl.parser.AstBuilder; @@ -59,22 +59,30 @@ public class UnifiedQueryPlanner { * * @param queryType the query language type (e.g., PPL) * @param rootSchema the root Calcite schema containing all catalogs and tables + * @param defaultPath dot-separated path of schema to set as default schema */ - public UnifiedQueryPlanner(QueryType queryType, SchemaPlus rootSchema) { + public UnifiedQueryPlanner(QueryType queryType, SchemaPlus rootSchema, String defaultPath) { this.queryType = queryType; this.parser = buildQueryParser(queryType); - this.config = buildCalciteConfig(rootSchema); + this.config = buildCalciteConfig(rootSchema, defaultPath); } /** * Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate - * optimal physical plan to fully unify query execution and leverage Calcite's optimzer. + * optimal physical plan to fully unify query execution and leverage Calcite's optimizer. * * @param query the raw query string in PPL or other supported syntax * @return a logical plan representing the query */ public RelNode plan(String query) { - return preserveCollation(analyze(parse(query))); + try { + return preserveCollation(analyze(parse(query))); + } catch (SyntaxCheckException e) { + // Re-throw syntax error without wrapping + throw e; + } catch (Exception e) { + throw new IllegalStateException("Failed to plan query", e); + } } private Parser buildQueryParser(QueryType queryType) { @@ -84,23 +92,44 @@ private Parser buildQueryParser(QueryType queryType) { throw new IllegalArgumentException("Unsupported query type: " + queryType); } - private FrameworkConfig buildCalciteConfig(SchemaPlus defaultSchema) { + private FrameworkConfig buildCalciteConfig(SchemaPlus rootSchema, String defaultPath) { + SchemaPlus defaultSchema = findSchemaByPath(rootSchema, defaultPath); return Frameworks.newConfigBuilder() - .parserConfig(SqlParser.Config.DEFAULT) // TODO check + .parserConfig(SqlParser.Config.DEFAULT) .defaultSchema(defaultSchema) .traitDefs((List) null) .programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE)) - .typeSystem(OpenSearchTypeSystem.INSTANCE) .build(); } + private static SchemaPlus findSchemaByPath(SchemaPlus rootSchema, String defaultPath) { + if (defaultPath == null) { + return rootSchema; + } + + // Find schema by the path recursively + SchemaPlus current = rootSchema; + for (String part : defaultPath.split("\\.")) { + current = current.getSubSchema(part); + if (current == null) { + throw new IllegalArgumentException("Invalid default catalog path: " + defaultPath); + } + } + return current; + } + private UnresolvedPlan parse(String query) { ParseTree cst = parser.parse(query); AstStatementBuilder astStmtBuilder = new AstStatementBuilder( new AstBuilder(query), AstStatementBuilder.StatementBuilderContext.builder().build()); Statement statement = cst.accept(astStmtBuilder); - return ((Query) statement).getPlan(); + + if (statement instanceof Query) { + return ((Query) statement).getPlan(); + } + throw new UnsupportedOperationException( + "Only query statements are supported but got " + statement.getClass().getSimpleName()); } private RelNode analyze(UnresolvedPlan ast) { @@ -129,8 +158,9 @@ public static Builder builder() { */ public static class Builder { private final Map catalogs = new HashMap<>(); + private String defaultNamespace; private QueryType queryType; - private boolean cacheSchema; + private boolean cacheMetadata; /** * Sets the query language frontend to be used by the planner. @@ -158,13 +188,24 @@ public Builder catalog(String name, Schema schema) { } /** - * Enables or disables schema caching in the root schema. + * Sets the default namespace path for resolving unqualified table names. + * + * @param namespace dot-separated path (e.g., "spark_catalog.default" or "opensearch") + * @return this builder instance + */ + public Builder defaultNamespace(String namespace) { + this.defaultNamespace = namespace; + return this; + } + + /** + * Enables or disables catalog metadata caching in the root schema. * - * @param cache whether to enable schema caching + * @param cache whether to enable metadata caching * @return this builder instance */ - public Builder cacheSchema(boolean cache) { - this.cacheSchema = cache; + public Builder cacheMetadata(boolean cache) { + this.cacheMetadata = cache; return this; } @@ -175,9 +216,9 @@ public Builder cacheSchema(boolean cache) { */ public UnifiedQueryPlanner build() { Objects.requireNonNull(queryType, "Must specify language before build"); - SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheSchema).plus(); + SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheMetadata).plus(); catalogs.forEach(rootSchema::add); - return new UnifiedQueryPlanner(queryType, rootSchema); + return new UnifiedQueryPlanner(queryType, rootSchema, defaultNamespace); } } } diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java index 14012445f60..0f7754ba501 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerTest.java @@ -6,17 +6,20 @@ package org.opensearch.sql.api; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Test; +import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.executor.QueryType; public class UnifiedQueryPlannerTest { @@ -27,7 +30,7 @@ public class UnifiedQueryPlannerTest { @Override protected Map getTableMap() { return Map.of( - "test", + "index", new AbstractTable() { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { @@ -41,6 +44,15 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { } }; + /** Test catalog consists of test schema above */ + private final AbstractSchema testDeepSchema = + new AbstractSchema() { + @Override + protected Map getSubSchemaMap() { + return Map.of("opensearch", testSchema); + } + }; + @Test public void testPPLQueryPlanning() { UnifiedQueryPlanner planner = @@ -49,10 +61,40 @@ public void testPPLQueryPlanning() { .catalog("opensearch", testSchema) .build(); - RelNode plan = planner.plan("source = opensearch.test | eval f = abs(id)"); + RelNode plan = planner.plan("source = opensearch.index | eval f = abs(id)"); assertNotNull("Plan should be created", plan); } + @Test + public void testPPLQueryPlanningWithDefaultNamespace() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", testSchema) + .defaultNamespace("opensearch") + .build(); + + assertNotNull("Plan should be created", planner.plan("source = opensearch.index")); + assertNotNull("Plan should be created", planner.plan("source = index")); + } + + @Test + public void testPPLQueryPlanningWithDefaultNamespaceMultiLevel() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("catalog", testDeepSchema) + .defaultNamespace("catalog.opensearch") + .build(); + + assertNotNull("Plan should be created", planner.plan("source = catalog.opensearch.index")); + assertNotNull("Plan should be created", planner.plan("source = index")); + + // This is valid in SparkSQL, but Calcite requires "catalog" as the default root schema to + // resolve it + assertThrows(IllegalStateException.class, () -> planner.plan("source = opensearch.index")); + } + @Test public void testPPLQueryPlanningWithMultipleCatalogs() { UnifiedQueryPlanner planner = @@ -63,21 +105,35 @@ public void testPPLQueryPlanningWithMultipleCatalogs() { .build(); RelNode plan = - planner.plan("source = catalog1.test | lookup catalog2.test id | eval f = abs(id)"); + planner.plan("source = catalog1.index | lookup catalog2.index id | eval f = abs(id)"); assertNotNull("Plan should be created with multiple catalogs", plan); } @Test - public void testPPLQueryPlanningWithSchemaCaching() { + public void testPPLQueryPlanningWithMultipleCatalogsAndDefaultNamespace() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("catalog1", testSchema) + .catalog("catalog2", testSchema) + .defaultNamespace("catalog2") + .build(); + + RelNode plan = planner.plan("source = catalog1.index | lookup index id | eval f = abs(id)"); + assertNotNull("Plan should be created with multiple catalogs", plan); + } + + @Test + public void testPPLQueryPlanningWithMetadataCaching() { UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder() .language(QueryType.PPL) .catalog("opensearch", testSchema) - .cacheSchema(true) + .cacheMetadata(true) .build(); - RelNode plan = planner.plan("source = opensearch.test | eval f = abs(id)"); - assertNotNull("Planner should work with schema caching enabled", plan); + RelNode plan = planner.plan("source = opensearch.index"); + assertNotNull("Plan should be created", plan); } @Test(expected = NullPointerException.class) @@ -88,8 +144,39 @@ public void testMissingQueryLanguage() { @Test(expected = IllegalArgumentException.class) public void testUnsupportedQueryLanguage() { UnifiedQueryPlanner.builder() - .language(QueryType.SQL) // only PPL is supported + .language(QueryType.SQL) // only PPL is supported for now + .catalog("opensearch", testSchema) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDefaultNamespacePath() { + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) .catalog("opensearch", testSchema) + .defaultNamespace("nonexistent") // nonexistent namespace path .build(); } + + @Test(expected = IllegalStateException.class) + public void testUnsupportedStatementType() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", testSchema) + .build(); + + planner.plan("explain source = index"); // explain statement + } + + @Test(expected = SyntaxCheckException.class) + public void testPlanPropagatingSyntaxCheckException() { + UnifiedQueryPlanner planner = + UnifiedQueryPlanner.builder() + .language(QueryType.PPL) + .catalog("opensearch", testSchema) + .build(); + + planner.plan("source = index | eval"); // Trigger syntax error from parser + } }