Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,34 @@ Together, these components enable a complete workflow: parse PPL queries into lo

## Usage

### UnifiedQueryPlanner
### UnifiedQueryContext

`UnifiedQueryContext` is a reusable abstraction shared across unified query components (planner, compiler, etc.). It bundles `CalcitePlanContext` and `Settings` into a single object, centralizing configuration for all unified query operations.

Use the declarative, fluent builder API to initialize the `UnifiedQueryPlanner`.
Create a context with catalog configuration, query type, and optional settings:

```java
UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder()
UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("opensearch", schema)
.catalog("opensearch", opensearchSchema)
.catalog("spark_catalog", sparkSchema)
.defaultNamespace("opensearch")
.cacheMetadata(true)
.setting("plugins.query.size_limit", 200)
.build();
```

### UnifiedQueryPlanner

RelNode plan = planner.plan("source = opensearch.test");
Use `UnifiedQueryPlanner` to parse and analyze PPL queries into Calcite logical plans. The planner accepts a `UnifiedQueryContext` and can be reused for multiple queries.

```java
// Create planner with context
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);

// Plan multiple queries (context is reused)
RelNode plan1 = planner.plan("source = logs | where status = 200");
RelNode plan2 = planner.plan("source = metrics | stats avg(cpu)");
```

### UnifiedQueryTranspiler
Expand All @@ -46,25 +61,28 @@ String sql = transpiler.toSql(plan);

### Complete Workflow Example

Combining both components to transpile PPL queries into target database SQL:
Combining all components to transpile PPL queries into target database SQL:

```java
// Step 1: Initialize planner
UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder()
// Step 1: Create reusable context (shared across components)
UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", schema)
.defaultNamespace("catalog")
.build();

// Step 2: Parse PPL query into logical plan
// Step 2: Create planner with context
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);

// Step 3: Plan PPL query into logical plan
RelNode plan = planner.plan("source = employees | where age > 30");

// Step 3: Initialize transpiler with target dialect
// Step 4: Create transpiler with target dialect
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
.dialect(SparkSqlDialect.DEFAULT)
.build();

// Step 4: Transpile to target SQL
// Step 5: Transpile to target SQL
String sparkSql = transpiler.toSql(plan);
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
```
Expand Down
189 changes: 189 additions & 0 deletions api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api;

import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.Value;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Programs;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.SysLimit;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;

/**
* A reusable abstraction shared across unified query components (planner, compiler, etc.). This
* centralizes configuration for catalog schemas, query type, execution limits, and other settings,
* enabling consistent behavior across all unified query operations.
*/
@Value
public class UnifiedQueryContext {

/** CalcitePlanContext containing Calcite framework configuration and query type. */
CalcitePlanContext planContext;

/** Settings containing execution limits and feature flags used by parsers and planners. */
Settings settings;

/** Creates a new builder for UnifiedQueryContext. */
public static Builder builder() {
return new Builder();
}

/** Builder that constructs UnifiedQueryContext. */
public static class Builder {
private QueryType queryType;
private final Map<String, Schema> catalogs = new HashMap<>();
private String defaultNamespace;
private boolean cacheMetadata = false;

/**
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
* to avoid coupling with OpenSearchSettings.
*/
private final Map<Settings.Key, Object> settings =
new HashMap<Settings.Key, Object>(
Map.of(
QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(),
PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(),
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit()));

/**
* Sets the query language frontend to be used.
*
* @param queryType the {@link QueryType}, such as PPL
* @return this builder instance
*/
public Builder language(QueryType queryType) {
this.queryType = queryType;
return this;
}

/**
* Registers a catalog with the specified name and its associated schema. The schema can be a
* flat or nested structure (e.g., catalog → schema → table), depending on how data is
* organized.
*
* @param name the name of the catalog to register
* @param schema the schema representing the structure of the catalog
* @return this builder instance
*/
public Builder catalog(String name, Schema schema) {
catalogs.put(name, schema);
return this;
}

/**
* Sets the default namespace path for resolving unqualified table names.
*
* @param namespace dot-separated path (e.g., "spark_catalog.default" or "opensearch")
* @return this builder instance
*/
public Builder defaultNamespace(String namespace) {
this.defaultNamespace = namespace;
return this;
}

/**
* Enables or disables catalog metadata caching in the root schema.
*
* @param cache whether to enable metadata caching
* @return this builder instance
*/
public Builder cacheMetadata(boolean cache) {
this.cacheMetadata = cache;
return this;
}

/**
* Sets a specific setting value by name.
*
* @param name the setting key name (e.g., "plugins.query.size_limit")
* @param value the setting value
* @throws IllegalArgumentException if the setting name is not recognized
*/
public Builder setting(String name, Object value) {
Settings.Key key =
Settings.Key.of(name)
.orElseThrow(() -> new IllegalArgumentException("Unknown setting name: " + name));
settings.put(key, value);
return this;
}

/**
* Builds a {@link UnifiedQueryContext} with the configuration.
*
* @return a new instance of {@link UnifiedQueryContext}
*/
public UnifiedQueryContext build() {
Objects.requireNonNull(queryType, "Must specify language before build");

Settings settings = buildSettings();
CalcitePlanContext planContext =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
return new UnifiedQueryContext(planContext, settings);
}

private Settings buildSettings() {
return new Settings() {
@Override
@SuppressWarnings("unchecked")
public <T> T getSettingValue(Key key) {
return (T) settings.get(key);
}

@Override
public List<?> getSettings() {
return List.copyOf(settings.entrySet());
}
};
}

@SuppressWarnings({"rawtypes"})
private FrameworkConfig buildFrameworkConfig() {
SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheMetadata).plus();
catalogs.forEach(rootSchema::add);

SchemaPlus defaultSchema = findSchemaByPath(rootSchema, defaultNamespace);
return Frameworks.newConfigBuilder()
.parserConfig(SqlParser.Config.DEFAULT)
.defaultSchema(defaultSchema)
.traitDefs((List<RelTraitDef>) null)
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
.build();
}

private SchemaPlus findSchemaByPath(SchemaPlus rootSchema, String defaultPath) {
if (defaultPath == null) {
return rootSchema;
}

SchemaPlus current = rootSchema;
for (String part : defaultPath.split("\\.")) {
current = current.getSubSchema(part);
if (current == null) {
throw new IllegalArgumentException("Invalid default catalog path: " + defaultPath);
}
}
return current;
}
}
}
Loading
Loading