Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2f0c405
Initial implementation for multisearch command
ahkcs Sep 15, 2025
4f9c5a6
timestamp interleaving
ahkcs Sep 18, 2025
b09dd09
removal
ahkcs Sep 18, 2025
2965078
update test
ahkcs Sep 18, 2025
00c4a6e
fix tests
ahkcs Sep 19, 2025
001a5a2
remove streaming command
ahkcs Sep 22, 2025
4dfca30
update doctest
ahkcs Sep 22, 2025
7a91631
fix doctest
ahkcs Sep 22, 2025
d006e11
add CalciteExplainIT
ahkcs Sep 22, 2025
bd7f591
formatting
ahkcs Sep 22, 2025
5579ce2
Anonymizer test
ahkcs Sep 22, 2025
b202948
fixes
ahkcs Sep 23, 2025
54259a0
removal
ahkcs Sep 24, 2025
7ccf26a
update explainIT
ahkcs Sep 24, 2025
8639c92
fix test
ahkcs Sep 24, 2025
3fc1998
fix anonymizerTest
ahkcs Sep 24, 2025
b5d7c44
fixes
ahkcs Sep 24, 2025
4cb8daf
update grammar
ahkcs Sep 25, 2025
cdbd9c4
update explainIT
ahkcs Sep 25, 2025
343ff88
update error handling
ahkcs Sep 25, 2025
f1d1b23
update explainIT to use yaml
ahkcs Sep 25, 2025
4bb0c3a
removal
ahkcs Sep 25, 2025
8232eac
add schema null filling test cases
ahkcs Sep 25, 2025
2efc13b
make @timestamp priority timestamp
ahkcs Sep 25, 2025
243981b
fix
ahkcs Sep 26, 2025
71d9736
fixes
ahkcs Sep 26, 2025
4c25348
fix test
ahkcs Sep 26, 2025
7038bf5
update doc
ahkcs Sep 29, 2025
b6312e1
fix IT
ahkcs Sep 29, 2025
1540f65
doc
ahkcs Sep 29, 2025
25b6016
remove duplication handling logic
ahkcs Sep 30, 2025
9bd54df
better formatting
ahkcs Sep 30, 2025
915495c
fixes
ahkcs Oct 1, 2025
4bb996f
CI
ahkcs Oct 1, 2025
bd249bf
fix explainIT
ahkcs Oct 1, 2025
f5546e1
Extract SchemaUnifier
ahkcs Oct 1, 2025
dfd8def
trim doctest dataset
ahkcs Oct 1, 2025
d719b80
remove template change
ahkcs Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -820,6 +821,11 @@ public LogicalPlan visitAppend(Append node, AnalysisContext context) {
throw getOnlyForCalciteException("Append");
}

@Override
public LogicalPlan visitMultisearch(Multisearch node, AnalysisContext context) {
throw getOnlyForCalciteException("Multisearch");
}

private LogicalSort buildSort(
LogicalPlan child, AnalysisContext context, Integer count, List<Field> sortFields) {
ExpressionReferenceOptimizer optimizer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -431,4 +432,8 @@ public T visitAppendCol(AppendCol node, C context) {
public T visitAppend(Append node, C context) {
return visitChildren(node, context);
}

public T visitMultisearch(Multisearch node, C context) {
return visitChildren(node, context);
}
}
47 changes: 47 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Multisearch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;

/** Logical plan node for Multisearch operation. Combines results from multiple search queries. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class Multisearch extends UnresolvedPlan {

private UnresolvedPlan child;
private final List<UnresolvedPlan> subsearches;

public Multisearch(List<UnresolvedPlan> subsearches) {
this.subsearches = subsearches;
}

@Override
public Multisearch attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
if (this.child == null) {
return ImmutableList.copyOf(subsearches);
} else {
return ImmutableList.<UnresolvedPlan>builder().add(this.child).addAll(subsearches).build();
}
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitMultisearch(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
Expand All @@ -60,7 +61,6 @@
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.calcite.util.Holder;
Expand Down Expand Up @@ -111,6 +111,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -1649,65 +1650,73 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
node.getSubSearch().accept(new EmptySourcePropagateVisitor(), null);
prunedSubSearch.accept(this, context);

// 3. Merge two query schemas
// 3. Merge two query schemas using shared logic
RelNode subsearchNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
List<RelDataTypeField> mainFields = mainNode.getRowType().getFieldList();
List<RelDataTypeField> subsearchFields = subsearchNode.getRowType().getFieldList();
Map<String, RelDataTypeField> subsearchFieldMap =
subsearchFields.stream()
.map(typeField -> Pair.of(typeField.getName(), typeField))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
boolean[] isSelected = new boolean[subsearchFields.size()];
List<String> names = new ArrayList<>();
List<RexNode> mainUnionProjects = new ArrayList<>();
List<RexNode> subsearchUnionProjects = new ArrayList<>();

// 3.1 Start with main query's schema. If subsearch plan doesn't have matched column,
// add same type column in place with NULL literal
for (int i = 0; i < mainFields.size(); i++) {
mainUnionProjects.add(context.rexBuilder.makeInputRef(mainNode, i));
RelDataTypeField mainField = mainFields.get(i);
RelDataTypeField subsearchField = subsearchFieldMap.get(mainField.getName());
names.add(mainField.getName());
if (subsearchFieldMap.containsKey(mainField.getName())
&& subsearchField != null
&& subsearchField.getType().equals(mainField.getType())) {
subsearchUnionProjects.add(
context.rexBuilder.makeInputRef(subsearchNode, subsearchField.getIndex()));
isSelected[subsearchField.getIndex()] = true;
} else {
subsearchUnionProjects.add(context.rexBuilder.makeNullLiteral(mainField.getType()));
}

// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subsearchNode);
List<RelNode> projectedNodes =
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);

// 4. Union the projected plans
for (RelNode projectedNode : projectedNodes) {
context.relBuilder.push(projectedNode);
}
context.relBuilder.union(true);
return context.relBuilder.peek();
}

// 3.2 Add remaining subsearch columns to the merged schema
for (int j = 0; j < subsearchFields.size(); j++) {
RelDataTypeField subsearchField = subsearchFields.get(j);
if (!isSelected[j]) {
mainUnionProjects.add(context.rexBuilder.makeNullLiteral(subsearchField.getType()));
subsearchUnionProjects.add(context.rexBuilder.makeInputRef(subsearchNode, j));
names.add(subsearchField.getName());
@Override
public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
List<RelNode> subsearchNodes = new ArrayList<>();
for (UnresolvedPlan subsearch : node.getSubsearches()) {
UnresolvedPlan prunedSubSearch = subsearch.accept(new EmptySourcePropagateVisitor(), null);
prunedSubSearch.accept(this, context);
subsearchNodes.add(context.relBuilder.build());
}

// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> alignedNodes =
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(subsearchNodes, context);

for (RelNode alignedNode : alignedNodes) {
context.relBuilder.push(alignedNode);
}
context.relBuilder.union(true, alignedNodes.size());

RelDataType rowType = context.relBuilder.peek().getRowType();
String timestampField = findTimestampField(rowType);
if (timestampField != null) {
RelDataTypeField timestampFieldRef = rowType.getField(timestampField, false, false);
if (timestampFieldRef != null) {
RexNode timestampRef =
context.rexBuilder.makeInputRef(
context.relBuilder.peek(), timestampFieldRef.getIndex());
context.relBuilder.sort(context.relBuilder.desc(timestampRef));
}
}

// 3.3 Uniquify names in case the merged names have duplicates
List<String> uniqNames =
SqlValidatorUtil.uniquify(names, SqlValidatorUtil.EXPR_SUGGESTER, true);

// 4. Apply new schema over two query plans
RelNode projectedMainNode =
context.relBuilder.push(mainNode).project(mainUnionProjects, uniqNames).build();
RelNode projectedSubsearchNode =
context.relBuilder.push(subsearchNode).project(subsearchUnionProjects, uniqNames).build();

// 5. Union all two projected plans
context.relBuilder.push(projectedMainNode);
context.relBuilder.push(projectedSubsearchNode);
context.relBuilder.union(true);
return context.relBuilder.peek();
}

/**
* Finds the timestamp field for multisearch ordering.
*
* @param rowType The row type to search for timestamp fields
* @return The name of the timestamp field, or null if not found
*/
private String findTimestampField(RelDataType rowType) {
String[] candidates = {"@timestamp", "_time", "timestamp", "time"};
for (String fieldName : candidates) {
RelDataTypeField field = rowType.getField(fieldName, false, false);
if (field != null) {
return fieldName;
}
}
return null;
}

/*
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
*/
Expand Down
158 changes: 158 additions & 0 deletions core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlValidatorUtil;

/**
* Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses
* the same strategy as append command - renames conflicting fields to avoid type conflicts.
*/
public class SchemaUnifier {

/**
* Builds a unified schema for multiple nodes with type conflict resolution.
*
* @param nodes List of RelNodes to unify schemas for
* @param context Calcite plan context
* @return List of projected RelNodes with unified schema
*/
public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
List<RelNode> nodes, CalcitePlanContext context) {
if (nodes.isEmpty()) {
return new ArrayList<>();
}

if (nodes.size() == 1) {
return nodes;
}

// Step 1: Build the unified schema by processing all nodes
List<SchemaField> unifiedSchema = buildUnifiedSchema(nodes);

// Step 2: Create projections for each node to align with unified schema
List<RelNode> projectedNodes = new ArrayList<>();
List<String> fieldNames =
unifiedSchema.stream().map(SchemaField::getName).collect(Collectors.toList());

for (RelNode node : nodes) {
List<RexNode> projection = buildProjectionForNode(node, unifiedSchema, context);
RelNode projectedNode = context.relBuilder.push(node).project(projection, fieldNames).build();
projectedNodes.add(projectedNode);
}

// Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.)
List<String> uniqueNames =
SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true);

// Step 4: Re-project with unique names if needed
if (!uniqueNames.equals(fieldNames)) {
List<RelNode> renamedNodes = new ArrayList<>();
for (RelNode node : projectedNodes) {
RelNode renamedNode =
context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build();
renamedNodes.add(renamedNode);
}
return renamedNodes;
}

return projectedNodes;
}

/**
* Builds a unified schema by merging fields from all nodes. Fields with the same name but
* different types are added as separate entries (which will be renamed during uniquification).
*
* @param nodes List of RelNodes to merge schemas from
* @return List of SchemaField representing the unified schema (may contain duplicate names)
*/
private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) {
List<SchemaField> schema = new ArrayList<>();
Map<String, Set<RelDataType>> seenFields = new HashMap<>();

for (RelNode node : nodes) {
for (RelDataTypeField field : node.getRowType().getFieldList()) {
String fieldName = field.getName();
RelDataType fieldType = field.getType();

// Track which (name, type) combinations we've seen
Set<RelDataType> typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>());

if (!typesForName.contains(fieldType)) {
// New field or same name with different type - add to schema
schema.add(new SchemaField(fieldName, fieldType));
typesForName.add(fieldType);
}
// If we've seen this exact (name, type) combination, skip it
}
}

return schema;
}

/**
* Builds a projection for a node to align with the unified schema. For each field in the unified
* schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL
*
* @param node The node to build projection for
* @param unifiedSchema List of SchemaField representing the unified schema
* @param context Calcite plan context
* @return List of RexNode representing the projection
*/
private static List<RexNode> buildProjectionForNode(
RelNode node, List<SchemaField> unifiedSchema, CalcitePlanContext context) {
Map<String, RelDataTypeField> nodeFieldMap =
node.getRowType().getFieldList().stream()
.collect(Collectors.toMap(RelDataTypeField::getName, field -> field));

List<RexNode> projection = new ArrayList<>();
for (SchemaField schemaField : unifiedSchema) {
String fieldName = schemaField.getName();
RelDataType expectedType = schemaField.getType();
RelDataTypeField nodeField = nodeFieldMap.get(fieldName);

if (nodeField != null && nodeField.getType().equals(expectedType)) {
// Field exists with matching type - use it
projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex()));
} else {
// Field missing or type mismatch - project NULL
projection.add(context.rexBuilder.makeNullLiteral(expectedType));
}
}

return projection;
}

/** Represents a field in the unified schema with name and type. */
private static class SchemaField {
private final String name;
private final RelDataType type;

SchemaField(String name, RelDataType type) {
this.name = name;
this.type = type;
}

String getName() {
return name;
}

RelDataType getType() {
return type;
}
}
}
Loading
Loading