Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -807,6 +808,11 @@ public LogicalPlan visitAppend(Append node, AnalysisContext context) {
throw getOnlyForCalciteException("Append");
}

@Override
public LogicalPlan visitMultisearch(Multisearch node, AnalysisContext context) {
throw getOnlyForCalciteException("Multisearch");
}

private LogicalSort buildSort(
LogicalPlan child, AnalysisContext context, Integer count, List<Field> sortFields) {
ExpressionReferenceOptimizer optimizer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -432,4 +433,8 @@ public T visitAppendCol(AppendCol node, C context) {
public T visitAppend(Append node, C context) {
return visitChildren(node, context);
}

public T visitMultisearch(Multisearch node, C context) {
return visitChildren(node, context);
}
}
47 changes: 47 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Multisearch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;

/** Logical plan node for Multisearch operation. Combines results from multiple search queries. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class Multisearch extends UnresolvedPlan {

private UnresolvedPlan child;
private final List<UnresolvedPlan> subsearches;

public Multisearch(List<UnresolvedPlan> subsearches) {
this.subsearches = subsearches;
}

@Override
public Multisearch attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
if (this.child == null) {
return ImmutableList.copyOf(subsearches);
} else {
return ImmutableList.<UnresolvedPlan>builder().add(this.child).addAll(subsearches).build();
}
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitMultisearch(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
Expand All @@ -58,7 +59,6 @@
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.calcite.util.Holder;
Expand Down Expand Up @@ -108,6 +108,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -1631,65 +1632,73 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
node.getSubSearch().accept(new EmptySourcePropagateVisitor(), null);
prunedSubSearch.accept(this, context);

// 3. Merge two query schemas
// 3. Merge two query schemas using shared logic
RelNode subsearchNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
List<RelDataTypeField> mainFields = mainNode.getRowType().getFieldList();
List<RelDataTypeField> subsearchFields = subsearchNode.getRowType().getFieldList();
Map<String, RelDataTypeField> subsearchFieldMap =
subsearchFields.stream()
.map(typeField -> Pair.of(typeField.getName(), typeField))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
boolean[] isSelected = new boolean[subsearchFields.size()];
List<String> names = new ArrayList<>();
List<RexNode> mainUnionProjects = new ArrayList<>();
List<RexNode> subsearchUnionProjects = new ArrayList<>();

// 3.1 Start with main query's schema. If subsearch plan doesn't have matched column,
// add same type column in place with NULL literal
for (int i = 0; i < mainFields.size(); i++) {
mainUnionProjects.add(context.rexBuilder.makeInputRef(mainNode, i));
RelDataTypeField mainField = mainFields.get(i);
RelDataTypeField subsearchField = subsearchFieldMap.get(mainField.getName());
names.add(mainField.getName());
if (subsearchFieldMap.containsKey(mainField.getName())
&& subsearchField != null
&& subsearchField.getType().equals(mainField.getType())) {
subsearchUnionProjects.add(
context.rexBuilder.makeInputRef(subsearchNode, subsearchField.getIndex()));
isSelected[subsearchField.getIndex()] = true;
} else {
subsearchUnionProjects.add(context.rexBuilder.makeNullLiteral(mainField.getType()));
}

// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subsearchNode);
List<RelNode> projectedNodes =
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);

// 4. Union the projected plans
for (RelNode projectedNode : projectedNodes) {
context.relBuilder.push(projectedNode);
}
context.relBuilder.union(true);
return context.relBuilder.peek();
}

// 3.2 Add remaining subsearch columns to the merged schema
for (int j = 0; j < subsearchFields.size(); j++) {
RelDataTypeField subsearchField = subsearchFields.get(j);
if (!isSelected[j]) {
mainUnionProjects.add(context.rexBuilder.makeNullLiteral(subsearchField.getType()));
subsearchUnionProjects.add(context.rexBuilder.makeInputRef(subsearchNode, j));
names.add(subsearchField.getName());
@Override
public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
List<RelNode> subsearchNodes = new ArrayList<>();
for (UnresolvedPlan subsearch : node.getSubsearches()) {
UnresolvedPlan prunedSubSearch = subsearch.accept(new EmptySourcePropagateVisitor(), null);
prunedSubSearch.accept(this, context);
subsearchNodes.add(context.relBuilder.build());
}

// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> alignedNodes =
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(subsearchNodes, context);

for (RelNode alignedNode : alignedNodes) {
context.relBuilder.push(alignedNode);
}
context.relBuilder.union(true, alignedNodes.size());

RelDataType rowType = context.relBuilder.peek().getRowType();
String timestampField = findTimestampField(rowType);
if (timestampField != null) {
RelDataTypeField timestampFieldRef = rowType.getField(timestampField, false, false);
if (timestampFieldRef != null) {
RexNode timestampRef =
context.rexBuilder.makeInputRef(
context.relBuilder.peek(), timestampFieldRef.getIndex());
context.relBuilder.sort(context.relBuilder.desc(timestampRef));
}
}

// 3.3 Uniquify names in case the merged names have duplicates
List<String> uniqNames =
SqlValidatorUtil.uniquify(names, SqlValidatorUtil.EXPR_SUGGESTER, true);

// 4. Apply new schema over two query plans
RelNode projectedMainNode =
context.relBuilder.push(mainNode).project(mainUnionProjects, uniqNames).build();
RelNode projectedSubsearchNode =
context.relBuilder.push(subsearchNode).project(subsearchUnionProjects, uniqNames).build();

// 5. Union all two projected plans
context.relBuilder.push(projectedMainNode);
context.relBuilder.push(projectedSubsearchNode);
context.relBuilder.union(true);
return context.relBuilder.peek();
}

/**
* Finds the timestamp field for multisearch ordering.
*
* @param rowType The row type to search for timestamp fields
* @return The name of the timestamp field, or null if not found
*/
private String findTimestampField(RelDataType rowType) {
String[] candidates = {"@timestamp", "_time", "timestamp", "time"};
for (String fieldName : candidates) {
RelDataTypeField field = rowType.getField(fieldName, false, false);
if (field != null) {
return fieldName;
}
}
return null;
}

/*
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
*/
Expand Down
158 changes: 158 additions & 0 deletions core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlValidatorUtil;

/**
* Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses
* the same strategy as append command - renames conflicting fields to avoid type conflicts.
*/
public class SchemaUnifier {

/**
* Builds a unified schema for multiple nodes with type conflict resolution.
*
* @param nodes List of RelNodes to unify schemas for
* @param context Calcite plan context
* @return List of projected RelNodes with unified schema
*/
public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
List<RelNode> nodes, CalcitePlanContext context) {
if (nodes.isEmpty()) {
return new ArrayList<>();
}

if (nodes.size() == 1) {
return nodes;
}

// Step 1: Build the unified schema by processing all nodes
List<SchemaField> unifiedSchema = buildUnifiedSchema(nodes);

// Step 2: Create projections for each node to align with unified schema
List<RelNode> projectedNodes = new ArrayList<>();
List<String> fieldNames =
unifiedSchema.stream().map(SchemaField::getName).collect(Collectors.toList());

for (RelNode node : nodes) {
List<RexNode> projection = buildProjectionForNode(node, unifiedSchema, context);
RelNode projectedNode = context.relBuilder.push(node).project(projection, fieldNames).build();
projectedNodes.add(projectedNode);
}

// Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.)
List<String> uniqueNames =
SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true);

// Step 4: Re-project with unique names if needed
if (!uniqueNames.equals(fieldNames)) {
List<RelNode> renamedNodes = new ArrayList<>();
for (RelNode node : projectedNodes) {
RelNode renamedNode =
context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build();
renamedNodes.add(renamedNode);
}
return renamedNodes;
}

return projectedNodes;
}

/**
* Builds a unified schema by merging fields from all nodes. Fields with the same name but
* different types are added as separate entries (which will be renamed during uniquification).
*
* @param nodes List of RelNodes to merge schemas from
* @return List of SchemaField representing the unified schema (may contain duplicate names)
*/
private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) {
List<SchemaField> schema = new ArrayList<>();
Map<String, Set<RelDataType>> seenFields = new HashMap<>();

for (RelNode node : nodes) {
for (RelDataTypeField field : node.getRowType().getFieldList()) {
String fieldName = field.getName();
RelDataType fieldType = field.getType();

// Track which (name, type) combinations we've seen
Set<RelDataType> typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>());

if (!typesForName.contains(fieldType)) {
// New field or same name with different type - add to schema
schema.add(new SchemaField(fieldName, fieldType));
typesForName.add(fieldType);
}
// If we've seen this exact (name, type) combination, skip it
}
}

return schema;
}

/**
* Builds a projection for a node to align with the unified schema. For each field in the unified
* schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL
*
* @param node The node to build projection for
* @param unifiedSchema List of SchemaField representing the unified schema
* @param context Calcite plan context
* @return List of RexNode representing the projection
*/
private static List<RexNode> buildProjectionForNode(
RelNode node, List<SchemaField> unifiedSchema, CalcitePlanContext context) {
Map<String, RelDataTypeField> nodeFieldMap =
node.getRowType().getFieldList().stream()
.collect(Collectors.toMap(RelDataTypeField::getName, field -> field));

List<RexNode> projection = new ArrayList<>();
for (SchemaField schemaField : unifiedSchema) {
String fieldName = schemaField.getName();
RelDataType expectedType = schemaField.getType();
RelDataTypeField nodeField = nodeFieldMap.get(fieldName);

if (nodeField != null && nodeField.getType().equals(expectedType)) {
// Field exists with matching type - use it
projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex()));
} else {
// Field missing or type mismatch - project NULL
projection.add(context.rexBuilder.makeNullLiteral(expectedType));
}
}

return projection;
}

/** Represents a field in the unified schema with name and type. */
private static class SchemaField {
private final String name;
private final RelDataType type;

SchemaField(String name, RelDataType type) {
this.name = name;
this.type = type;
}

String getName() {
return name;
}

RelDataType getType() {
return type;
}
}
}
Loading
Loading