Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnionRecursive;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
Expand Down Expand Up @@ -855,6 +856,11 @@ public LogicalPlan visitMultisearch(Multisearch node, AnalysisContext context) {
throw getOnlyForCalciteException("Multisearch");
}

@Override
public LogicalPlan visitUnionRecursive(UnionRecursive node, AnalysisContext context) {
throw getOnlyForCalciteException("UnionRecursive");
}

private LogicalSort buildSort(
LogicalPlan child, AnalysisContext context, Integer count, List<Field> sortFields) {
ExpressionReferenceOptimizer optimizer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnionRecursive;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;

Expand Down Expand Up @@ -146,6 +147,10 @@ public T visitAppendPipe(AppendPipe node, C context) {
return visitChildren(node, context);
}

public T visitUnionRecursive(UnionRecursive node, C context) {
return visitChildren(node, context);
}

public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnionRecursive;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;

Expand Down Expand Up @@ -65,6 +66,15 @@ public UnresolvedPlan visitAppendCol(AppendCol node, Void context) {
return new AppendCol(node.isOverride(), subSearch).attach(child);
}

@Override
public UnresolvedPlan visitUnionRecursive(UnionRecursive node, Void context) {
UnresolvedPlan recursiveSubsearch = node.getRecursiveSubsearch().accept(this, context);
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
return new UnionRecursive(
node.getRelationName(), node.getMaxDepth(), node.getMaxRows(), recursiveSubsearch)
.attach(child);
}

// TODO: Revisit lookup logic here but for now we don't see use case yet
@Override
public UnresolvedPlan visitLookup(Lookup node, Void context) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnionRecursive;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
Expand Down Expand Up @@ -569,6 +570,15 @@ public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquer
return new AppendPipe(subquery).attach(input);
}

public static UnionRecursive unionRecursive(
UnresolvedPlan input,
String relationName,
Integer maxDepth,
Integer maxRows,
UnresolvedPlan recursiveSubsearch) {
return new UnionRecursive(relationName, maxDepth, maxRows, recursiveSubsearch).attach(input);
}

public static Trendline.TrendlineComputation computation(
Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) {
return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/** Logical plan node of UNION RECURSIVE, the interface for recursive union queries. */
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class UnionRecursive extends UnresolvedPlan {

private final String relationName;
private final Integer maxDepth;
private final Integer maxRows;
private final UnresolvedPlan recursiveSubsearch;

private UnresolvedPlan child;

@Override
public UnionRecursive attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
return visitor.visitUnionRecursive(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.sql.Connection;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -17,6 +19,7 @@
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexLambdaRef;
import org.apache.calcite.rex.RexNode;
Expand Down Expand Up @@ -59,6 +62,7 @@ public class CalcitePlanContext {

private final Stack<RexCorrelVariable> correlVar = new Stack<>();
private final Stack<List<RexNode>> windowPartitions = new Stack<>();
private final Deque<RecursiveRelationInfo> recursiveRelations = new ArrayDeque<>();

@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;

Expand Down Expand Up @@ -99,6 +103,7 @@ private CalcitePlanContext(CalcitePlanContext parent) {
this.rexLambdaRefMap = new HashMap<>(); // New map for lambda variables
this.capturedVariables = new ArrayList<>(); // New list for captured variables
this.inLambdaContext = true; // Mark that we're inside a lambda
this.recursiveRelations.addAll(parent.recursiveRelations);
}

public RexNode resolveJoinCondition(
Expand Down Expand Up @@ -130,6 +135,29 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
}
}

public void pushRecursiveRelation(String relationName, RelDataType rowType) {
recursiveRelations.push(new RecursiveRelationInfo(relationName, rowType));
}

public Optional<RecursiveRelationInfo> findRecursiveRelation(String relationName) {
if (relationName == null) {
return Optional.empty();
}
String relationNameLower = relationName.toLowerCase(java.util.Locale.ROOT);
for (RecursiveRelationInfo info : recursiveRelations) {
if (info.nameLower.equals(relationNameLower)) {
return Optional.of(info);
}
}
return Optional.empty();
}

public void popRecursiveRelation() {
if (!recursiveRelations.isEmpty()) {
recursiveRelations.pop();
}
}

/**
* Creates a clone of this context that shares the relBuilder with the parent. This allows lambda
* expressions to reference fields from the current row while having their own lambda variable
Expand Down Expand Up @@ -206,4 +234,17 @@ public RexLambdaRef captureVariable(RexNode fieldRef, String fieldName) {

return lambdaRef;
}

@Getter
public static final class RecursiveRelationInfo {
private final String name;
private final String nameLower;
private final RelDataType rowType;

private RecursiveRelationInfo(String name, RelDataType rowType) {
this.name = name;
this.nameLower = name.toLowerCase(java.util.Locale.ROOT);
this.rowType = rowType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.PatternMethod;
import org.opensearch.sql.ast.expression.PatternMode;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.Span;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
Expand Down Expand Up @@ -142,6 +143,7 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.Trendline.TrendlineType;
import org.opensearch.sql.ast.tree.UnionRecursive;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
Expand Down Expand Up @@ -184,6 +186,13 @@ public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {

@Override
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
Optional<CalcitePlanContext.RecursiveRelationInfo> recursiveRelation =
findRecursiveRelation(node, context);
if (recursiveRelation.isPresent()) {
CalcitePlanContext.RecursiveRelationInfo relationInfo = recursiveRelation.get();
context.relBuilder.transientScan(relationInfo.getName(), relationInfo.getRowType());
return context.relBuilder.peek();
}
DataSourceSchemaIdentifierNameResolver nameResolver =
new DataSourceSchemaIdentifierNameResolver(
dataSourceService, node.getTableQualifiedName().getParts());
Expand All @@ -207,6 +216,19 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) {
return scan;
}

private Optional<CalcitePlanContext.RecursiveRelationInfo> findRecursiveRelation(
Relation node, CalcitePlanContext context) {
List<QualifiedName> qualifiedNames = node.getQualifiedNames();
if (qualifiedNames.size() != 1) {
return Optional.empty();
}
QualifiedName qualifiedName = qualifiedNames.get(0);
if (qualifiedName.getParts().size() != 1) {
return Optional.empty();
}
return context.findRecursiveRelation(qualifiedName.getParts().get(0));
}

// This is a tool method to add an existed RelOptTable to builder stack, not used for now
private RelBuilder scan(RelOptTable tableSchema, CalcitePlanContext context) {
final RelNode scan =
Expand Down Expand Up @@ -2229,6 +2251,78 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context);
}

@Override
public RelNode visitUnionRecursive(UnionRecursive node, CalcitePlanContext context) {
visitChildren(node, context);
RelNode anchorNode = context.relBuilder.build();
RelDataType anchorRowType = anchorNode.getRowType();

context.pushRecursiveRelation(node.getRelationName(), anchorRowType);
try {
UnresolvedPlan prunedSubSearch =
node.getRecursiveSubsearch().accept(new EmptySourcePropagateVisitor(), null);
prunedSubSearch.accept(this, context);
} finally {
context.popRecursiveRelation();
}

RelNode recursiveNode = context.relBuilder.build();
validateUnionRecursiveSchema(anchorRowType, recursiveNode.getRowType(), node.getRelationName());

context.relBuilder.push(anchorNode);
context.relBuilder.push(recursiveNode);
int iterationLimit = node.getMaxDepth() == null ? -1 : node.getMaxDepth();
context.relBuilder.repeatUnion(node.getRelationName(), true, iterationLimit);

if (node.getMaxRows() != null) {
PlanUtils.replaceTop(
context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.QUERY_SIZE_LIMIT,
context.relBuilder.peek(),
context.relBuilder.literal(node.getMaxRows())));
}

return context.relBuilder.peek();
}

private void validateUnionRecursiveSchema(
RelDataType anchorRowType, RelDataType recursiveRowType, String relationName) {
List<RelDataTypeField> anchorFields = anchorRowType.getFieldList();
List<RelDataTypeField> recursiveFields = recursiveRowType.getFieldList();

if (anchorFields.size() != recursiveFields.size()) {
throw new SemanticCheckException(
"UNION RECURSIVE schema mismatch for relation "
+ relationName
+ ": anchor field count "
+ anchorFields.size()
+ " does not match recursive field count "
+ recursiveFields.size());
}

for (int i = 0; i < anchorFields.size(); i++) {
RelDataTypeField anchorField = anchorFields.get(i);
RelDataTypeField recursiveField = recursiveFields.get(i);
if (!anchorField.getName().equalsIgnoreCase(recursiveField.getName())) {
throw new SemanticCheckException(
"UNION RECURSIVE schema mismatch for relation "
+ relationName
+ ": anchor field name "
+ anchorField.getName()
+ " does not match recursive field name "
+ recursiveField.getName());
}
if (!SqlTypeUtil.equalSansNullability(anchorField.getType(), recursiveField.getType())) {
throw new SemanticCheckException(
"UNION RECURSIVE schema mismatch for relation "
+ relationName
+ ": field type mismatch for "
+ anchorField.getName());
}
}
}

private RelNode mergeTableAndResolveColumnConflict(
RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) {
// Use shared schema merging logic that handles type conflicts via field renaming
Expand Down
22 changes: 22 additions & 0 deletions docs/dev/union_recursive_task1_context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Task 1 Context - UNION RECURSIVE Syntax/Grammar

## What changed
- Added `UNION RECURSIVE` command grammar and recursive subpipeline rule in `ppl/src/main/antlr/OpenSearchPPLParser.g4`.
- Added lexer tokens `UNION` and `RECURSIVE` in `ppl/src/main/antlr/OpenSearchPPLLexer.g4`.
- Added parser tests (pass + fail) in `ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java`.

## Notes/decisions
- The recursive block is parsed as `recursiveSubPipeline : PIPE? subSearch` so it can include an optional leading pipe.
- The name/options use generic `ident EQUAL ...` pairs in grammar for now; AST work should interpret these as `name`, `max_depth`, `max_rows` with validation in Task 2+.
- Changes in `language-grammar/` were reverted per request; only `ppl/` grammar is updated.

## Tests run
- `./gradlew :ppl:test --tests org.opensearch.sql.ppl.antlr.PPLSyntaxParserTest`

## Files touched
- `ppl/src/main/antlr/OpenSearchPPLParser.g4`
- `ppl/src/main/antlr/OpenSearchPPLLexer.g4`
- `ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java`

## Next task pointer
- Task 2: Add AST and logical plan nodes for UNION RECURSIVE, including parsing of `name`, `max_depth`, and `max_rows` options from the grammar output.
Loading
Loading