diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index d5c37d405ec..537b67d73d2 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -75,6 +75,7 @@ import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; +import org.opensearch.sql.ast.tree.Multisearch; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -820,6 +821,11 @@ public LogicalPlan visitAppend(Append node, AnalysisContext context) { throw getOnlyForCalciteException("Append"); } + @Override + public LogicalPlan visitMultisearch(Multisearch node, AnalysisContext context) { + throw getOnlyForCalciteException("Multisearch"); + } + private LogicalSort buildSort( LogicalPlan child, AnalysisContext context, Integer count, List sortFields) { ExpressionReferenceOptimizer optimizer = diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 84f7bdbd4a6..e444b040763 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -63,6 +63,7 @@ import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; +import org.opensearch.sql.ast.tree.Multisearch; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -431,4 +432,8 @@ public T visitAppendCol(AppendCol node, C context) { public T visitAppend(Append node, C context) { return visitChildren(node, context); } + + public T visitMultisearch(Multisearch node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Multisearch.java b/core/src/main/java/org/opensearch/sql/ast/tree/Multisearch.java new file mode 100644 index 00000000000..a1acefdd70f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Multisearch.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; + +/** Logical plan node for Multisearch operation. Combines results from multiple search queries. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +public class Multisearch extends UnresolvedPlan { + + private UnresolvedPlan child; + private final List subsearches; + + public Multisearch(List subsearches) { + this.subsearches = subsearches; + } + + @Override + public Multisearch attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + if (this.child == null) { + return ImmutableList.copyOf(subsearches); + } else { + return ImmutableList.builder().add(this.child).addAll(subsearches).build(); + } + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitMultisearch(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 1f105f5f229..f237014cc0a 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -48,6 +48,7 @@ import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; @@ -60,7 +61,6 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilder.AggCall; import org.apache.calcite.util.Holder; @@ -111,6 +111,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; import org.opensearch.sql.ast.tree.ML; +import org.opensearch.sql.ast.tree.Multisearch; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; @@ -1649,65 +1650,73 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) { node.getSubSearch().accept(new EmptySourcePropagateVisitor(), null); prunedSubSearch.accept(this, context); - // 3. Merge two query schemas + // 3. Merge two query schemas using shared logic RelNode subsearchNode = context.relBuilder.build(); RelNode mainNode = context.relBuilder.build(); - List mainFields = mainNode.getRowType().getFieldList(); - List subsearchFields = subsearchNode.getRowType().getFieldList(); - Map subsearchFieldMap = - subsearchFields.stream() - .map(typeField -> Pair.of(typeField.getName(), typeField)) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - boolean[] isSelected = new boolean[subsearchFields.size()]; - List names = new ArrayList<>(); - List mainUnionProjects = new ArrayList<>(); - List subsearchUnionProjects = new ArrayList<>(); - - // 3.1 Start with main query's schema. If subsearch plan doesn't have matched column, - // add same type column in place with NULL literal - for (int i = 0; i < mainFields.size(); i++) { - mainUnionProjects.add(context.rexBuilder.makeInputRef(mainNode, i)); - RelDataTypeField mainField = mainFields.get(i); - RelDataTypeField subsearchField = subsearchFieldMap.get(mainField.getName()); - names.add(mainField.getName()); - if (subsearchFieldMap.containsKey(mainField.getName()) - && subsearchField != null - && subsearchField.getType().equals(mainField.getType())) { - subsearchUnionProjects.add( - context.rexBuilder.makeInputRef(subsearchNode, subsearchField.getIndex())); - isSelected[subsearchField.getIndex()] = true; - } else { - subsearchUnionProjects.add(context.rexBuilder.makeNullLiteral(mainField.getType())); - } + + // Use shared schema merging logic that handles type conflicts via field renaming + List nodesToMerge = Arrays.asList(mainNode, subsearchNode); + List projectedNodes = + SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); + + // 4. Union the projected plans + for (RelNode projectedNode : projectedNodes) { + context.relBuilder.push(projectedNode); } + context.relBuilder.union(true); + return context.relBuilder.peek(); + } - // 3.2 Add remaining subsearch columns to the merged schema - for (int j = 0; j < subsearchFields.size(); j++) { - RelDataTypeField subsearchField = subsearchFields.get(j); - if (!isSelected[j]) { - mainUnionProjects.add(context.rexBuilder.makeNullLiteral(subsearchField.getType())); - subsearchUnionProjects.add(context.rexBuilder.makeInputRef(subsearchNode, j)); - names.add(subsearchField.getName()); + @Override + public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) { + List subsearchNodes = new ArrayList<>(); + for (UnresolvedPlan subsearch : node.getSubsearches()) { + UnresolvedPlan prunedSubSearch = subsearch.accept(new EmptySourcePropagateVisitor(), null); + prunedSubSearch.accept(this, context); + subsearchNodes.add(context.relBuilder.build()); + } + + // Use shared schema merging logic that handles type conflicts via field renaming + List alignedNodes = + SchemaUnifier.buildUnifiedSchemaWithConflictResolution(subsearchNodes, context); + + for (RelNode alignedNode : alignedNodes) { + context.relBuilder.push(alignedNode); + } + context.relBuilder.union(true, alignedNodes.size()); + + RelDataType rowType = context.relBuilder.peek().getRowType(); + String timestampField = findTimestampField(rowType); + if (timestampField != null) { + RelDataTypeField timestampFieldRef = rowType.getField(timestampField, false, false); + if (timestampFieldRef != null) { + RexNode timestampRef = + context.rexBuilder.makeInputRef( + context.relBuilder.peek(), timestampFieldRef.getIndex()); + context.relBuilder.sort(context.relBuilder.desc(timestampRef)); } } - // 3.3 Uniquify names in case the merged names have duplicates - List uniqNames = - SqlValidatorUtil.uniquify(names, SqlValidatorUtil.EXPR_SUGGESTER, true); - - // 4. Apply new schema over two query plans - RelNode projectedMainNode = - context.relBuilder.push(mainNode).project(mainUnionProjects, uniqNames).build(); - RelNode projectedSubsearchNode = - context.relBuilder.push(subsearchNode).project(subsearchUnionProjects, uniqNames).build(); - - // 5. Union all two projected plans - context.relBuilder.push(projectedMainNode); - context.relBuilder.push(projectedSubsearchNode); - context.relBuilder.union(true); return context.relBuilder.peek(); } + /** + * Finds the timestamp field for multisearch ordering. + * + * @param rowType The row type to search for timestamp fields + * @return The name of the timestamp field, or null if not found + */ + private String findTimestampField(RelDataType rowType) { + String[] candidates = {"@timestamp", "_time", "timestamp", "time"}; + for (String fieldName : candidates) { + RelDataTypeField field = rowType.getField(fieldName, false, false); + if (field != null) { + return fieldName; + } + } + return null; + } + /* * Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta */ diff --git a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java new file mode 100644 index 00000000000..627d1de8dc4 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java @@ -0,0 +1,158 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.validate.SqlValidatorUtil; + +/** + * Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses + * the same strategy as append command - renames conflicting fields to avoid type conflicts. + */ +public class SchemaUnifier { + + /** + * Builds a unified schema for multiple nodes with type conflict resolution. + * + * @param nodes List of RelNodes to unify schemas for + * @param context Calcite plan context + * @return List of projected RelNodes with unified schema + */ + public static List buildUnifiedSchemaWithConflictResolution( + List nodes, CalcitePlanContext context) { + if (nodes.isEmpty()) { + return new ArrayList<>(); + } + + if (nodes.size() == 1) { + return nodes; + } + + // Step 1: Build the unified schema by processing all nodes + List unifiedSchema = buildUnifiedSchema(nodes); + + // Step 2: Create projections for each node to align with unified schema + List projectedNodes = new ArrayList<>(); + List fieldNames = + unifiedSchema.stream().map(SchemaField::getName).collect(Collectors.toList()); + + for (RelNode node : nodes) { + List projection = buildProjectionForNode(node, unifiedSchema, context); + RelNode projectedNode = context.relBuilder.push(node).project(projection, fieldNames).build(); + projectedNodes.add(projectedNode); + } + + // Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.) + List uniqueNames = + SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true); + + // Step 4: Re-project with unique names if needed + if (!uniqueNames.equals(fieldNames)) { + List renamedNodes = new ArrayList<>(); + for (RelNode node : projectedNodes) { + RelNode renamedNode = + context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build(); + renamedNodes.add(renamedNode); + } + return renamedNodes; + } + + return projectedNodes; + } + + /** + * Builds a unified schema by merging fields from all nodes. Fields with the same name but + * different types are added as separate entries (which will be renamed during uniquification). + * + * @param nodes List of RelNodes to merge schemas from + * @return List of SchemaField representing the unified schema (may contain duplicate names) + */ + private static List buildUnifiedSchema(List nodes) { + List schema = new ArrayList<>(); + Map> seenFields = new HashMap<>(); + + for (RelNode node : nodes) { + for (RelDataTypeField field : node.getRowType().getFieldList()) { + String fieldName = field.getName(); + RelDataType fieldType = field.getType(); + + // Track which (name, type) combinations we've seen + Set typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>()); + + if (!typesForName.contains(fieldType)) { + // New field or same name with different type - add to schema + schema.add(new SchemaField(fieldName, fieldType)); + typesForName.add(fieldType); + } + // If we've seen this exact (name, type) combination, skip it + } + } + + return schema; + } + + /** + * Builds a projection for a node to align with the unified schema. For each field in the unified + * schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL + * + * @param node The node to build projection for + * @param unifiedSchema List of SchemaField representing the unified schema + * @param context Calcite plan context + * @return List of RexNode representing the projection + */ + private static List buildProjectionForNode( + RelNode node, List unifiedSchema, CalcitePlanContext context) { + Map nodeFieldMap = + node.getRowType().getFieldList().stream() + .collect(Collectors.toMap(RelDataTypeField::getName, field -> field)); + + List projection = new ArrayList<>(); + for (SchemaField schemaField : unifiedSchema) { + String fieldName = schemaField.getName(); + RelDataType expectedType = schemaField.getType(); + RelDataTypeField nodeField = nodeFieldMap.get(fieldName); + + if (nodeField != null && nodeField.getType().equals(expectedType)) { + // Field exists with matching type - use it + projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex())); + } else { + // Field missing or type mismatch - project NULL + projection.add(context.rexBuilder.makeNullLiteral(expectedType)); + } + } + + return projection; + } + + /** Represents a field in the unified schema with name and type. */ + private static class SchemaField { + private final String name; + private final RelDataType type; + + SchemaField(String name, RelDataType type) { + this.name = name; + this.type = type; + } + + String getName() { + return name; + } + + RelDataType getType() { + return type; + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/ast/tree/MultisearchTest.java b/core/src/test/java/org/opensearch/sql/ast/tree/MultisearchTest.java new file mode 100644 index 00000000000..afb26e77089 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/ast/tree/MultisearchTest.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.ast.dsl.AstDSL.compare; +import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.filter; +import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.relation; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.Node; + +class MultisearchTest { + + @Test + public void testMultisearchCreation() { + // Create subsearches + UnresolvedPlan subsearch1 = relation("table1"); + UnresolvedPlan subsearch2 = + filter(relation("table2"), compare(">", field("age"), intLiteral(30))); + List subsearches = ImmutableList.of(subsearch1, subsearch2); + + // Create multisearch + Multisearch multisearch = new Multisearch(subsearches); + + // Verify properties + assertEquals(subsearches, multisearch.getSubsearches()); + assertEquals(2, multisearch.getSubsearches().size()); + assertNotNull(multisearch.getChild()); + assertEquals(subsearches, multisearch.getChild()); + } + + @Test + public void testMultisearchWithChild() { + UnresolvedPlan subsearch1 = relation("table1"); + UnresolvedPlan subsearch2 = relation("table2"); + List subsearches = ImmutableList.of(subsearch1, subsearch2); + + Multisearch multisearch = new Multisearch(subsearches); + UnresolvedPlan mainQuery = relation("main_table"); + + // Attach child + multisearch.attach(mainQuery); + + // Verify child is attached + List children = multisearch.getChild(); + assertEquals(3, children.size()); // main query + 2 subsearches + assertEquals(mainQuery, children.get(0)); + assertEquals(subsearch1, children.get(1)); + assertEquals(subsearch2, children.get(2)); + } + + @Test + public void testMultisearchVisitorAccept() { + UnresolvedPlan subsearch = relation("table"); + Multisearch multisearch = new Multisearch(ImmutableList.of(subsearch)); + + // Test visitor pattern + TestVisitor visitor = new TestVisitor(); + String result = multisearch.accept(visitor, "test_context"); + + assertEquals("visitMultisearch_called_with_test_context", result); + } + + @Test + public void testMultisearchEqualsAndHashCode() { + UnresolvedPlan subsearch1 = relation("table1"); + UnresolvedPlan subsearch2 = relation("table2"); + List subsearches = ImmutableList.of(subsearch1, subsearch2); + + Multisearch multisearch1 = new Multisearch(subsearches); + Multisearch multisearch2 = new Multisearch(subsearches); + + assertEquals(multisearch1, multisearch2); + assertEquals(multisearch1.hashCode(), multisearch2.hashCode()); + } + + @Test + public void testMultisearchToString() { + UnresolvedPlan subsearch = relation("table"); + Multisearch multisearch = new Multisearch(ImmutableList.of(subsearch)); + + String toString = multisearch.toString(); + assertNotNull(toString); + assertTrue(toString.contains("Multisearch")); + } + + // Test visitor implementation + private static class TestVisitor extends AbstractNodeVisitor { + @Override + public String visitMultisearch(Multisearch node, String context) { + return "visitMultisearch_called_with_" + context; + } + } +} diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index 8135e3684d4..3b277cd978f 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -35,7 +35,7 @@ Example 1: Show All Indices Information SQL query:: os> SHOW TABLES LIKE '%' - fetched rows / total rows = 18/18 + fetched rows / total rows = 20/20 +----------------+-------------+------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | |----------------+-------------+------------------+------------+---------+----------+------------+-----------+---------------------------+----------------| @@ -52,6 +52,8 @@ SQL query:: | docTestCluster | null | otellogs | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | people | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | state_country | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | time_data | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | time_data2 | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | time_test | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | weblogs | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | wildcard | BASE TABLE | null | null | null | null | null | null | diff --git a/docs/user/ppl/cmd/multisearch.rst b/docs/user/ppl/cmd/multisearch.rst new file mode 100644 index 00000000000..10820badc54 --- /dev/null +++ b/docs/user/ppl/cmd/multisearch.rst @@ -0,0 +1,182 @@ +============= +multisearch +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| (Experimental) +| Using ``multisearch`` command to run multiple search subsearches and merge their results together. The command allows you to combine data from different queries on the same or different sources, and optionally apply subsequent processing to the combined result set. + +| Key aspects of ``multisearch``: + +1. Combines results from multiple search operations into a single result set. +2. Each subsearch can have different filtering criteria, data transformations, and field selections. +3. Results are merged and can be further processed with aggregations, sorting, and other PPL commands. +4. Particularly useful for comparative analysis, union operations, and creating comprehensive datasets from multiple search criteria. +5. Supports timestamp-based result interleaving when working with time-series data. + +| Use Cases: + +* **Comparative Analysis**: Compare metrics across different segments, regions, or time periods +* **Success Rate Monitoring**: Calculate success rates by comparing successful vs. total operations +* **Multi-source Data Combination**: Merge data from different indices or apply different filters to the same source +* **A/B Testing Analysis**: Combine results from different test groups for comparison +* **Time-series Data Merging**: Interleave events from multiple sources based on timestamps + +Version +======= +3.3.0 + +Syntax +====== +| multisearch ... + +**Requirements:** + +* **Minimum 2 subsearches required** - multisearch must contain at least two subsearch blocks +* **Maximum unlimited** - you can specify as many subsearches as needed + +**Subsearch Format:** + +* Each subsearch must be enclosed in square brackets: ``[search ...]`` +* Each subsearch must start with the ``search`` keyword +* Syntax: ``[search source=index | commands...]`` +* Description: Each subsearch is a complete search pipeline enclosed in square brackets + * Supported commands in subsearches: All PPL commands are supported (``where``, ``eval``, ``fields``, ``head``, ``rename``, ``stats``, ``sort``, ``dedup``, etc.) + +* result-processing: optional. Commands applied to the merged results. + + * Description: After the multisearch operation, you can apply any PPL command to process the combined results, such as ``stats``, ``sort``, ``head``, etc. + +Limitations +=========== + +* **Minimum Subsearches**: At least two subsearches must be specified +* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the system automatically resolves conflicts by renaming the conflicting fields. The first occurrence retains the original name, while subsequent conflicting fields are renamed with a numeric suffix (e.g., ``age`` becomes ``age0``, ``age1``, etc.). This ensures all data is preserved while maintaining schema consistency. + +Usage +===== + +Basic multisearch:: + + | multisearch [search source=table | where condition1] [search source=table | where condition2] + | multisearch [search source=index1 | fields field1, field2] [search source=index2 | fields field1, field2] + | multisearch [search source=table | where status="success"] [search source=table | where status="error"] + +Example 1: Basic Age Group Analysis +=================================== + +Combine young and adult customers into a single result set for further analysis. + +PPL query:: + + os> | multisearch [search source=accounts | where age < 30 | eval age_group = "young" | fields firstname, age, age_group] [search source=accounts | where age >= 30 | eval age_group = "adult" | fields firstname, age, age_group] | sort age; + fetched rows / total rows = 4/4 + +-----------+-----+-----------+ + | firstname | age | age_group | + |-----------+-----+-----------| + | Nanette | 28 | young | + | Amber | 32 | adult | + | Hattie | 36 | adult | + | Dale | 37 | adult | + +-----------+-----+-----------+ + +Example 2: Success Rate Pattern +=============================== + +Combine high-balance and all valid accounts for comparison analysis. + +PPL query:: + + os> | multisearch [search source=accounts | where balance > 20000 | eval query_type = "high_balance" | fields firstname, balance, query_type] [search source=accounts | where balance > 0 AND balance <= 20000 | eval query_type = "regular" | fields firstname, balance, query_type] | sort balance desc; + fetched rows / total rows = 4/4 + +-----------+---------+-------------+ + | firstname | balance | query_type | + |-----------+---------+-------------| + | Amber | 39225 | high_balance| + | Nanette | 32838 | high_balance| + | Hattie | 5686 | regular | + | Dale | 4180 | regular | + +-----------+---------+-------------+ + +Example 3: Timestamp Interleaving +================================== + +Combine time-series data from multiple sources with automatic timestamp-based ordering. + +PPL query:: + + os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | head 5; + fetched rows / total rows = 5/5 + +-------+---------------------+----------+-------+---------------------+ + | index | @timestamp | category | value | timestamp | + |-------+---------------------+----------+-------+---------------------| + | null | 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 | + | null | 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 | + | null | 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 | + | null | 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 | + | null | 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 | + +-------+---------------------+----------+-------+---------------------+ + +Example 4: Handling Empty Results +================================== + +Multisearch gracefully handles cases where some subsearches return no results. + +PPL query:: + + os> | multisearch [search source=accounts | where age > 25 | fields firstname, age] [search source=accounts | where age > 200 | eval impossible = "yes" | fields firstname, age, impossible] | head 5; + fetched rows / total rows = 4/4 + +-----------+-----+------------+ + | firstname | age | impossible | + |-----------+-----+------------| + | Nanette | 28 | null | + | Amber | 32 | null | + | Hattie | 36 | null | + | Dale | 37 | null | + +-----------+-----+------------+ + +Example 5: Type Compatibility - Missing Fields +================================================= + +Demonstrate how missing fields are handled with NULL insertion. + +PPL query:: + + os> | multisearch [search source=accounts | where age < 30 | eval young_flag = "yes" | fields firstname, age, young_flag] [search source=accounts | where age >= 30 | fields firstname, age] | sort age; + fetched rows / total rows = 4/4 + +-----------+-----+------------+ + | firstname | age | young_flag | + |-----------+-----+------------| + | Nanette | 28 | yes | + | Amber | 32 | null | + | Hattie | 36 | null | + | Dale | 37 | null | + +-----------+-----+------------+ + +Example 6: Type Conflict Resolution - Automatic Renaming +=========================================================== + +When the same field name has incompatible types across subsearches, the system automatically renames conflicting fields with numeric suffixes. + +PPL query:: + + os> | multisearch [search source=accounts | fields firstname, age, balance | head 2] [search source=locations | fields description, age, place_id | head 2]; + fetched rows / total rows = 4/4 + +-----------+-----+---------+------------------+------+----------+ + | firstname | age | balance | description | age0 | place_id | + |-----------+-----+---------+------------------+------+----------| + | Amber | 32 | 39225 | null | null | null | + | Hattie | 36 | 5686 | null | null | null | + | null | null| null | Central Park | old | 1001 | + | null | null| null | Times Square | modern| 1002 | + +-----------+-----+---------+------------------+------+----------+ + +In this example, the ``age`` field has type ``bigint`` in accounts but type ``string`` in locations. The system keeps the first occurrence as ``age`` (bigint) and renames the second occurrence to ``age0`` (string), preserving all data while avoiding type conflicts. diff --git a/docs/user/ppl/index.rst b/docs/user/ppl/index.rst index 7f329d28773..36065997a42 100644 --- a/docs/user/ppl/index.rst +++ b/docs/user/ppl/index.rst @@ -88,6 +88,8 @@ The query start with search command and then flowing a set of command delimited - `ml command `_ + - `multisearch command `_ + - `parse command `_ - `patterns command `_ diff --git a/doctest/test_data/time_test_data.json b/doctest/test_data/time_test_data.json new file mode 100644 index 00000000000..841beb04700 --- /dev/null +++ b/doctest/test_data/time_test_data.json @@ -0,0 +1,40 @@ +{"index":{"_id":"1"}} +{"timestamp":"2025-07-31T08:58:54","value":6795,"category":"D","@timestamp":"2025-07-31T08:58:54"} +{"index":{"_id":"2"}} +{"timestamp":"2025-07-31T09:45:39","value":8571,"category":"B","@timestamp":"2025-07-31T09:45:39"} +{"index":{"_id":"3"}} +{"timestamp":"2025-07-31T10:32:24","value":7238,"category":"C","@timestamp":"2025-07-31T10:32:24"} +{"index":{"_id":"4"}} +{"timestamp":"2025-07-31T11:19:09","value":8914,"category":"A","@timestamp":"2025-07-31T11:19:09"} +{"index":{"_id":"5"}} +{"timestamp":"2025-07-31T12:06:02","value":6580,"category":"D","@timestamp":"2025-07-31T12:06:02"} +{"index":{"_id":"6"}} +{"timestamp":"2025-07-31T13:52:47","value":9102,"category":"B","@timestamp":"2025-07-31T13:52:47"} +{"index":{"_id":"7"}} +{"timestamp":"2025-07-31T14:39:32","value":7425,"category":"C","@timestamp":"2025-07-31T14:39:32"} +{"index":{"_id":"8"}} +{"timestamp":"2025-07-31T15:26:17","value":8753,"category":"A","@timestamp":"2025-07-31T15:26:17"} +{"index":{"_id":"9"}} +{"timestamp":"2025-07-31T16:13:10","value":6338,"category":"D","@timestamp":"2025-07-31T16:13:10"} +{"index":{"_id":"10"}} +{"timestamp":"2025-07-31T17:59:55","value":8909,"category":"B","@timestamp":"2025-07-31T17:59:55"} +{"index":{"_id":"11"}} +{"timestamp":"2025-07-31T18:46:40","value":7682,"category":"C","@timestamp":"2025-07-31T18:46:40"} +{"index":{"_id":"12"}} +{"timestamp":"2025-07-31T19:33:25","value":9231,"category":"A","@timestamp":"2025-07-31T19:33:25"} +{"index":{"_id":"13"}} +{"timestamp":"2025-07-31T20:20:18","value":6824,"category":"D","@timestamp":"2025-07-31T20:20:18"} +{"index":{"_id":"14"}} +{"timestamp":"2025-07-31T21:07:03","value":8490,"category":"B","@timestamp":"2025-07-31T21:07:03"} +{"index":{"_id":"15"}} +{"timestamp":"2025-07-31T22:53:48","value":7153,"category":"C","@timestamp":"2025-07-31T22:53:48"} +{"index":{"_id":"16"}} +{"timestamp":"2025-07-31T23:40:33","value":8676,"category":"A","@timestamp":"2025-07-31T23:40:33"} +{"index":{"_id":"17"}} +{"timestamp":"2025-08-01T00:27:26","value":6489,"category":"D","@timestamp":"2025-08-01T00:27:26"} +{"index":{"_id":"18"}} +{"timestamp":"2025-08-01T01:14:11","value":9015,"category":"B","@timestamp":"2025-08-01T01:14:11"} +{"index":{"_id":"19"}} +{"timestamp":"2025-08-01T02:00:56","value":7348,"category":"C","@timestamp":"2025-08-01T02:00:56"} +{"index":{"_id":"20"}} +{"timestamp":"2025-08-01T03:47:41","value":8762,"category":"A","@timestamp":"2025-08-01T03:47:41"} diff --git a/doctest/test_data/time_test_data2.json b/doctest/test_data/time_test_data2.json new file mode 100644 index 00000000000..db92260798c --- /dev/null +++ b/doctest/test_data/time_test_data2.json @@ -0,0 +1,40 @@ +{"index":{"_id":"1"}} +{"timestamp":"2025-08-01T04:00:00","value":2001,"category":"E","@timestamp":"2025-08-01T04:00:00"} +{"index":{"_id":"2"}} +{"timestamp":"2025-08-01T02:30:00","value":2002,"category":"F","@timestamp":"2025-08-01T02:30:00"} +{"index":{"_id":"3"}} +{"timestamp":"2025-08-01T01:00:00","value":2003,"category":"E","@timestamp":"2025-08-01T01:00:00"} +{"index":{"_id":"4"}} +{"timestamp":"2025-07-31T22:15:00","value":2004,"category":"F","@timestamp":"2025-07-31T22:15:00"} +{"index":{"_id":"5"}} +{"timestamp":"2025-07-31T20:45:00","value":2005,"category":"E","@timestamp":"2025-07-31T20:45:00"} +{"index":{"_id":"6"}} +{"timestamp":"2025-07-31T18:30:00","value":2006,"category":"F","@timestamp":"2025-07-31T18:30:00"} +{"index":{"_id":"7"}} +{"timestamp":"2025-07-31T16:00:00","value":2007,"category":"E","@timestamp":"2025-07-31T16:00:00"} +{"index":{"_id":"8"}} +{"timestamp":"2025-07-31T14:15:00","value":2008,"category":"F","@timestamp":"2025-07-31T14:15:00"} +{"index":{"_id":"9"}} +{"timestamp":"2025-07-31T12:30:00","value":2009,"category":"E","@timestamp":"2025-07-31T12:30:00"} +{"index":{"_id":"10"}} +{"timestamp":"2025-07-31T10:45:00","value":2010,"category":"F","@timestamp":"2025-07-31T10:45:00"} +{"index":{"_id":"11"}} +{"timestamp":"2025-07-31T08:00:00","value":2011,"category":"E","@timestamp":"2025-07-31T08:00:00"} +{"index":{"_id":"12"}} +{"timestamp":"2025-07-31T06:15:00","value":2012,"category":"F","@timestamp":"2025-07-31T06:15:00"} +{"index":{"_id":"13"}} +{"timestamp":"2025-07-31T04:30:00","value":2013,"category":"E","@timestamp":"2025-07-31T04:30:00"} +{"index":{"_id":"14"}} +{"timestamp":"2025-07-31T02:45:00","value":2014,"category":"F","@timestamp":"2025-07-31T02:45:00"} +{"index":{"_id":"15"}} +{"timestamp":"2025-07-31T01:00:00","value":2015,"category":"E","@timestamp":"2025-07-31T01:00:00"} +{"index":{"_id":"16"}} +{"timestamp":"2025-07-30T23:15:00","value":2016,"category":"F","@timestamp":"2025-07-30T23:15:00"} +{"index":{"_id":"17"}} +{"timestamp":"2025-07-30T21:30:00","value":2017,"category":"E","@timestamp":"2025-07-30T21:30:00"} +{"index":{"_id":"18"}} +{"timestamp":"2025-07-30T19:45:00","value":2018,"category":"F","@timestamp":"2025-07-30T19:45:00"} +{"index":{"_id":"19"}} +{"timestamp":"2025-07-30T18:00:00","value":2019,"category":"E","@timestamp":"2025-07-30T18:00:00"} +{"index":{"_id":"20"}} +{"timestamp":"2025-07-30T16:15:00","value":2020,"category":"F","@timestamp":"2025-07-30T16:15:00"} diff --git a/doctest/test_docs.py b/doctest/test_docs.py index c94378ed537..6d85fe3c983 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -42,6 +42,8 @@ 'work_information': 'work_information.json', 'events': 'events.json', 'otellogs': 'otellogs.json', + 'time_data': 'time_test_data.json', + 'time_data2': 'time_test_data2.json', 'time_test': 'time_test.json' } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java index d94bea7be77..3a512ca635f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java @@ -46,6 +46,7 @@ CalciteLegacyAPICompatibilityIT.class, CalciteLikeQueryIT.class, CalciteMathematicalFunctionIT.class, + CalciteMultisearchCommandIT.class, CalciteMultiValueStatsIT.class, CalciteNewAddedCommandsIT.class, CalciteNowLikeFunctionIT.class, diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 279cfd94b3c..8786a456b3e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -28,6 +28,7 @@ public void init() throws Exception { loadIndex(Index.BANK_WITH_STRING_VALUES); loadIndex(Index.NESTED_SIMPLE); loadIndex(Index.TIME_TEST_DATA); + loadIndex(Index.TIME_TEST_DATA2); loadIndex(Index.EVENTS); loadIndex(Index.LOGS); } @@ -152,6 +153,30 @@ public void testExplainIsEmpty() throws IOException { "source=opensearch-sql_test_index_account | where isempty(firstname)")); } + @Test + public void testExplainMultisearchBasic() throws IOException { + String query = + "| multisearch [search" + + " source=opensearch-sql_test_index_account | where age < 30 | eval age_group =" + + " 'young'] [search source=opensearch-sql_test_index_account | where age >= 30 | eval" + + " age_group = 'adult'] | stats count by age_group"; + var result = explainQueryToString(query); + String expected = loadExpectedPlan("explain_multisearch_basic.yaml"); + assertYamlEqualsJsonIgnoreId(expected, result); + } + + @Test + public void testExplainMultisearchTimestampInterleaving() throws IOException { + String query = + "| multisearch " + + "[search source=opensearch-sql_test_index_time_data | where category IN ('A', 'B')] " + + "[search source=opensearch-sql_test_index_time_data2 | where category IN ('E', 'F')] " + + "| head 5"; + var result = explainQueryToString(query); + String expected = loadExpectedPlan("explain_multisearch_timestamp.yaml"); + assertYamlEqualsJsonIgnoreId(expected, result); + } + // Only for Calcite @Test public void testExplainIsBlank() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java new file mode 100644 index 00000000000..72772c1cd84 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java @@ -0,0 +1,369 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.*; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteMultisearchCommandIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + loadIndex(Index.TIME_TEST_DATA); + loadIndex(Index.TIME_TEST_DATA2); + loadIndex(Index.LOCATIONS_TYPE_CONFLICT); + } + + @Test + public void testBasicMultisearch() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | where age < 30 | eval age_group = \\\"young\\\"] " + + "[search source=%s | where age >= 30 | eval age_group = \\\"adult\\\"] " + + "| stats count by age_group | sort age_group", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema(result, schema("count", null, "bigint"), schema("age_group", null, "string")); + verifyDataRows(result, rows(549L, "adult"), rows(451L, "young")); + } + + @Test + public void testMultisearchSuccessRatePattern() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | where balance > 20000 | eval query_type = \\\"good\\\"] " + + "[search source=%s | where balance > 0 | eval query_type = \\\"valid\\\"] " + + "| stats count(eval(query_type = \\\"good\\\")) as good_accounts, " + + " count(eval(query_type = \\\"valid\\\")) as total_valid", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema( + result, schema("good_accounts", null, "bigint"), schema("total_valid", null, "bigint")); + + verifyDataRows(result, rows(619L, 1000L)); + } + + @Test + public void testMultisearchWithThreeSubsearches() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where state = \\\"IL\\\" | eval region" + + " = \\\"Illinois\\\"] [search source=%s | where state = \\\"TN\\\" | eval" + + " region = \\\"Tennessee\\\"] [search source=%s | where state = \\\"CA\\\" |" + + " eval region = \\\"California\\\"] | stats count by region | sort region", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema(result, schema("count", null, "bigint"), schema("region", null, "string")); + + verifyDataRows(result, rows(17L, "California"), rows(22L, "Illinois"), rows(25L, "Tennessee")); + } + + @Test + public void testMultisearchWithComplexAggregation() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where gender = \\\"M\\\" | eval" + + " segment = \\\"male\\\"] [search source=%s | where gender = \\\"F\\\" | eval" + + " segment = \\\"female\\\"] | stats count as customer_count, avg(balance) as" + + " avg_balance by segment | sort segment", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema( + result, + schema("customer_count", null, "bigint"), + schema("avg_balance", null, "double"), + schema("segment", null, "string")); + + verifyDataRows( + result, rows(493L, 25623.34685598377, "female"), rows(507L, 25803.800788954635, "male")); + } + + @Test + public void testMultisearchWithEmptySubsearch() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | where age > 25] " + + "[search source=%s | where age > 200 | eval impossible = \\\"yes\\\"] " + + "| stats count", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema(result, schema("count", null, "bigint")); + + verifyDataRows(result, rows(733L)); + } + + @Test + public void testMultisearchWithFieldsProjection() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where gender = \\\"M\\\" | fields" + + " firstname, lastname, balance] [search source=%s | where gender = \\\"F\\\"" + + " | fields firstname, lastname, balance] | head 5", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema( + result, + schema("firstname", null, "string"), + schema("lastname", null, "string"), + schema("balance", null, "bigint")); + + verifyDataRows( + result, + rows("Amber", "Duke", 39225L), + rows("Hattie", "Bond", 5686L), + rows("Dale", "Adams", 4180L), + rows("Elinor", "Ratliff", 16418L), + rows("Mcgee", "Mooney", 18612L)); + } + + @Test + public void testMultisearchWithTimestampInterleaving() throws IOException { + JSONObject result = + executeQuery( + "| multisearch [search" + + " source=opensearch-sql_test_index_time_data | where category IN (\\\"A\\\"," + + " \\\"B\\\")] [search source=opensearch-sql_test_index_time_data2 | where" + + " category IN (\\\"E\\\", \\\"F\\\")] | head 10"); + + verifySchema( + result, + schema("@timestamp", null, "string"), + schema("category", null, "string"), + schema("value", null, "int"), + schema("timestamp", null, "string")); + + verifyDataRows( + result, + rows("2025-08-01 04:00:00", "E", 2001, "2025-08-01 04:00:00"), + rows("2025-08-01 03:47:41", "A", 8762, "2025-08-01 03:47:41"), + rows("2025-08-01 02:30:00", "F", 2002, "2025-08-01 02:30:00"), + rows("2025-08-01 01:14:11", "B", 9015, "2025-08-01 01:14:11"), + rows("2025-08-01 01:00:00", "E", 2003, "2025-08-01 01:00:00"), + rows("2025-07-31 23:40:33", "A", 8676, "2025-07-31 23:40:33"), + rows("2025-07-31 22:15:00", "F", 2004, "2025-07-31 22:15:00"), + rows("2025-07-31 21:07:03", "B", 8490, "2025-07-31 21:07:03"), + rows("2025-07-31 20:45:00", "E", 2005, "2025-07-31 20:45:00"), + rows("2025-07-31 19:33:25", "A", 9231, "2025-07-31 19:33:25")); + } + + @Test + public void testMultisearchWithNonStreamingCommands() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | where age < 30 | stats count() as young_count] " + + "[search source=%s | where age >= 30 | stats count() as adult_count] " + + "| stats sum(young_count) as total_young, sum(adult_count) as total_adult", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema( + result, schema("total_young", null, "bigint"), schema("total_adult", null, "bigint")); + + verifyDataRows(result, rows(451L, 549L)); + } + + @Test + public void testMultisearchWithSingleSubsearchThrowsError() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + "| multisearch " + "[search source=%s | where age > 30]", + TEST_INDEX_ACCOUNT))); + + assertTrue( + "Error message should indicate minimum subsearch requirement", + exception.getMessage().contains("Multisearch command requires at least two subsearches")); + } + + @Test + public void testMultisearchWithDifferentIndicesSchemaMerge() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where age > 35 | fields account_number," + + " firstname, age, balance] [search source=%s | where age > 35 | fields" + + " account_number, balance, age] | stats count() as total_count", + TEST_INDEX_ACCOUNT, TEST_INDEX_BANK)); + + verifySchema(result, schema("total_count", null, "bigint")); + verifyDataRows(result, rows(241L)); + } + + @Test + public void testMultisearchWithMixedIndicesComplexSchemaMerge() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where balance > 40000 | eval record_type =" + + " \\\"financial\\\" | fields account_number, balance, record_type] [search" + + " source=%s | where value > 5000 | eval record_type = \\\"timeseries\\\" |" + + " fields value, category, record_type] | stats count by record_type | sort" + + " record_type", + TEST_INDEX_ACCOUNT, "opensearch-sql_test_index_time_data")); + + verifySchema(result, schema("count", null, "bigint"), schema("record_type", null, "string")); + verifyDataRows(result, rows(215L, "financial"), rows(100L, "timeseries")); + } + + @Test + public void testMultisearchWithTimeIndicesTimestampOrdering() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | where category = \\\"A\\\" | stats count() as count_a] " + + "[search source=%s | where category = \\\"E\\\" | stats count() as count_e] " + + "| stats sum(count_a) as total_a, sum(count_e) as total_e", + "opensearch-sql_test_index_time_data", "opensearch-sql_test_index_time_data2")); + + verifySchema(result, schema("total_a", null, "bigint"), schema("total_e", null, "bigint")); + verifyDataRows(result, rows(26L, 10L)); + } + + @Test + public void testMultisearchNullFillingForMissingFields() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where account_number = 1 | fields firstname," + + " age, balance] [search source=%s | where account_number = 1 | fields" + + " lastname, city, employer] | head 2", + TEST_INDEX_ACCOUNT, TEST_INDEX_ACCOUNT)); + + verifySchema( + result, + schema("firstname", null, "string"), + schema("age", null, "bigint"), + schema("balance", null, "bigint"), + schema("lastname", null, "string"), + schema("city", null, "string"), + schema("employer", null, "string")); + + verifyDataRows( + result, + rows("Amber", 32L, 39225L, null, null, null), + rows(null, null, null, "Duke", "Brogan", "Pyrami")); + } + + @Test + public void testMultisearchNullFillingAcrossIndices() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch [search source=%s | where account_number = 1 | fields" + + " account_number, firstname, balance] [search source=%s | where" + + " account_number = 1 | fields city, employer, email] | head 2", + TEST_INDEX_ACCOUNT, TEST_INDEX_BANK)); + + verifySchema( + result, + schema("account_number", null, "bigint"), + schema("firstname", null, "string"), + schema("balance", null, "bigint"), + schema("city", null, "string"), + schema("employer", null, "string"), + schema("email", null, "string")); + + verifyDataRows( + result, + rows(1L, "Amber", 39225L, null, null, null), + rows(null, null, null, "Brogan", "Pyrami", "amberduke@pyrami.com")); + } + + @Test + public void testMultisearchWithDirectTypeConflict() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields firstname, age, balance | head 2] " + + "[search source=%s | fields description, age, place_id | head 2]", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); + + verifySchema( + result, + schema("firstname", null, "string"), + schema("age", null, "bigint"), + schema("balance", null, "bigint"), + schema("description", null, "string"), + schema("age0", null, "string"), + schema("place_id", null, "int")); + + verifyDataRows( + result, + rows("Amber", 32L, 39225L, null, null, null), + rows("Hattie", 36L, 5686L, null, null, null), + rows(null, null, null, "Central Park", "old", 1001), + rows(null, null, null, "Times Square", "modern", 1002)); + } + + @Test + public void testMultisearchCrossIndexFieldSelection() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields firstname, balance | head 2] " + + "[search source=%s | fields description, place_id | head 2]", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); + + verifySchema( + result, + schema("firstname", null, "string"), + schema("balance", null, "bigint"), + schema("description", null, "string"), + schema("place_id", null, "int")); + + verifyDataRows( + result, + rows("Amber", 39225L, null, null), + rows("Hattie", 5686L, null, null), + rows(null, null, "Central Park", 1001), + rows(null, null, "Times Square", 1002)); + } + + @Test + public void testMultisearchTypeConflictWithStats() throws IOException { + JSONObject result = + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields age] " + + "[search source=%s | fields age] " + + "| stats count() as total", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); + + verifySchema(result, schema("total", null, "bigint")); + + verifyDataRows(result, rows(1010L)); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 4e258088b5e..fb3cad3f9f4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -565,6 +565,11 @@ public enum Index { "location2", getLocationIndexMapping(), "src/test/resources/locations2.json"), + LOCATIONS_TYPE_CONFLICT( + TestsConstants.TEST_INDEX_LOCATIONS_TYPE_CONFLICT, + "locations", + getLocationsTypeConflictIndexMapping(), + "src/test/resources/locations_type_conflict.json"), NESTED( TestsConstants.TEST_INDEX_NESTED_TYPE, "nestedType", @@ -635,6 +640,11 @@ public enum Index { "_doc", getOrderIndexMapping(), "src/test/resources/order.json"), + TIME_TEST_DATA2( + "opensearch-sql_test_index_time_data2", + "time_data", + getMappingFile("time_test_data_index_mapping.json"), + "src/test/resources/time_test_data2.json"), WEBLOG( TestsConstants.TEST_INDEX_WEBLOGS, "weblogs", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index 09a24269692..a94e89ec0e6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -170,6 +170,11 @@ public static String getLocationIndexMapping() { return getMappingFile(mappingFile); } + public static String getLocationsTypeConflictIndexMapping() { + String mappingFile = "locations_type_conflict_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static String getEmployeeNestedTypeIndexMapping() { String mappingFile = "employee_nested_type_index_mapping.json"; return getMappingFile(mappingFile); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 17e30e5938c..76923dbd984 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -27,6 +27,8 @@ public class TestsConstants { public static final String TEST_INDEX_ODBC = TEST_INDEX + "_odbc"; public static final String TEST_INDEX_LOCATION = TEST_INDEX + "_location"; public static final String TEST_INDEX_LOCATION2 = TEST_INDEX + "_location2"; + public static final String TEST_INDEX_LOCATIONS_TYPE_CONFLICT = + TEST_INDEX + "_locations_type_conflict"; public static final String TEST_INDEX_NESTED_TYPE = TEST_INDEX + "_nested_type"; public static final String TEST_INDEX_NESTED_TYPE_WITHOUT_ARRAYS = TEST_INDEX + "_nested_type_without_arrays"; diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_basic.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_basic.yaml new file mode 100644 index 00000000000..8fe5241ced4 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_basic.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(count=[$1], age_group=[$0]) + LogicalAggregate(group=[{0}], count=[COUNT()]) + LogicalProject(age_group=[$11]) + LogicalUnion(all=[true]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], age_group=['young':VARCHAR]) + LogicalFilter(condition=[<($8, 30)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], age_group=['adult':VARCHAR]) + LogicalFilter(condition=[>=($8, 30)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], count=[$t1], age_group=[$t0]) + EnumerableAggregate(group=[{0}], count=[COUNT()]) + EnumerableUnion(all=[true]) + EnumerableCalc(expr#0=[{inputs}], expr#1=['young':VARCHAR], age_group=[$t1]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], FILTER-><($0, 30)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"range":{"age":{"from":null,"to":30,"include_lower":true,"include_upper":false,"boost":1.0}}},"_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + EnumerableCalc(expr#0=[{inputs}], expr#1=['adult':VARCHAR], age_group=[$t1]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], FILTER->>=($0, 30)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"range":{"age":{"from":30,"to":null,"include_lower":true,"include_upper":true,"boost":1.0}}},"_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yaml new file mode 100644 index 00000000000..b38889b0323 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yaml @@ -0,0 +1,27 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], dir0=[DESC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], dir0=[DESC], fetch=[5]) + LogicalUnion(all=[true]) + LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3]) + LogicalFilter(condition=[SEARCH($1, Sarg['A':VARCHAR, 'B':VARCHAR]:VARCHAR)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]]) + LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3]) + LogicalFilter(condition=[SEARCH($1, Sarg['E':VARCHAR, 'F':VARCHAR]:VARCHAR)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data2]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableLimit(fetch=[5]) + EnumerableMergeUnion(all=[true]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[@timestamp, category, value, timestamp], FILTER->SEARCH($1, Sarg['A':VARCHAR, 'B':VARCHAR]:VARCHAR), SORT->[{ + "@timestamp" : { + "order" : "desc", + "missing" : "_first" + } + }], LIMIT->5, LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","query":{"terms":{"category":["A","B"],"boost":1.0}},"_source":{"includes":["@timestamp","category","value","timestamp"],"excludes":[]},"sort":[{"@timestamp":{"order":"desc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data2]], PushDownContext=[[PROJECT->[@timestamp, category, value, timestamp], FILTER->SEARCH($1, Sarg['E':VARCHAR, 'F':VARCHAR]:VARCHAR), SORT->[{ + "@timestamp" : { + "order" : "desc", + "missing" : "_first" + } + }], LIMIT->5, LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","query":{"terms":{"category":["E","F"],"boost":1.0}},"_source":{"includes":["@timestamp","category","value","timestamp"],"excludes":[]},"sort":[{"@timestamp":{"order":"desc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_multisearch_basic.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_multisearch_basic.yaml new file mode 100644 index 00000000000..4910dc0a253 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_multisearch_basic.yaml @@ -0,0 +1,22 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(count=[$1], age_group=[$0]) + LogicalAggregate(group=[{0}], count=[COUNT()]) + LogicalProject(age_group=[$11]) + LogicalUnion(all=[true]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], age_group=['young':VARCHAR]) + LogicalFilter(condition=[<($8, 30)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], age_group=['adult':VARCHAR]) + LogicalFilter(condition=[>=($8, 30)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], count=[$t1], age_group=[$t0]) + EnumerableAggregate(group=[{0}], count=[COUNT()]) + EnumerableUnion(all=[true]) + EnumerableCalc(expr#0..16=[{inputs}], expr#17=['young':VARCHAR], expr#18=[30], expr#19=[<($t8, $t18)], age_group=[$t17], $condition=[$t19]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..16=[{inputs}], expr#17=['adult':VARCHAR], expr#18=[30], expr#19=[>=($t8, $t18)], age_group=[$t17], $condition=[$t19]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_multisearch_timestamp.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_multisearch_timestamp.yaml new file mode 100644 index 00000000000..64b0ff25050 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_multisearch_timestamp.yaml @@ -0,0 +1,25 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], dir0=[DESC], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$0], dir0=[DESC], fetch=[5]) + LogicalUnion(all=[true]) + LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3]) + LogicalFilter(condition=[SEARCH($1, Sarg['A':VARCHAR, 'B':VARCHAR]:VARCHAR)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]]) + LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3]) + LogicalFilter(condition=[SEARCH($1, Sarg['E':VARCHAR, 'F':VARCHAR]:VARCHAR)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data2]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableLimit(fetch=[5]) + EnumerableMergeUnion(all=[true]) + EnumerableCalc(expr#0..9=[{inputs}], proj#0..3=[{exprs}]) + EnumerableLimit(fetch=[5]) + EnumerableSort(sort0=[$0], dir0=[DESC]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[Sarg['A':VARCHAR, 'B':VARCHAR]:VARCHAR], expr#11=[SEARCH($t1, $t10)], proj#0..9=[{exprs}], $condition=[$t11]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]]) + EnumerableCalc(expr#0..9=[{inputs}], proj#0..3=[{exprs}]) + EnumerableLimit(fetch=[5]) + EnumerableSort(sort0=[$0], dir0=[DESC]) + EnumerableCalc(expr#0..9=[{inputs}], expr#10=[Sarg['E':VARCHAR, 'F':VARCHAR]:VARCHAR], expr#11=[SEARCH($t1, $t10)], proj#0..9=[{exprs}], $condition=[$t11]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data2]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/indexDefinitions/locations_type_conflict_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/locations_type_conflict_index_mapping.json new file mode 100644 index 00000000000..98a1bfd07db --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/locations_type_conflict_index_mapping.json @@ -0,0 +1,15 @@ +{ + "mappings": { + "properties": { + "description": { + "type": "text" + }, + "age": { + "type": "text" + }, + "place_id": { + "type": "integer" + } + } + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/locations.json b/integ-test/src/test/resources/locations.json deleted file mode 100644 index 642370dcc34..00000000000 --- a/integ-test/src/test/resources/locations.json +++ /dev/null @@ -1,4 +0,0 @@ -{"index":{"_id":"1"}} -{"description":"square","place":{"type": "Polygon","coordinates": [[ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],[100.0, 1.0], [100.0, 0.0]]]},"center":{"lat": 0.5, "lon": 100.5 }} -{"index":{"_id":"2"}} -{"description":"bigSquare","place":{"type": "Polygon","coordinates": [[ [100.0, 0.0], [110.0, 0.0], [110.0, 10.0],[100.0, 10.0], [100.0, 0.0]]]},"center":{"lat": 5.0, "lon": 105.0 }} diff --git a/integ-test/src/test/resources/locations_type_conflict.json b/integ-test/src/test/resources/locations_type_conflict.json new file mode 100644 index 00000000000..fd164b3e21c --- /dev/null +++ b/integ-test/src/test/resources/locations_type_conflict.json @@ -0,0 +1,20 @@ +{"index":{"_id":"1"}} +{"description":"Central Park","age":"old","place_id":1001} +{"index":{"_id":"2"}} +{"description":"Times Square","age":"modern","place_id":1002} +{"index":{"_id":"3"}} +{"description":"Brooklyn Bridge","age":"historic","place_id":1003} +{"index":{"_id":"4"}} +{"description":"Empire State Building","age":"1931","place_id":1004} +{"index":{"_id":"5"}} +{"description":"Statue of Liberty","age":"1886","place_id":1005} +{"index":{"_id":"6"}} +{"description":"Grand Central Terminal","age":"vintage","place_id":1006} +{"index":{"_id":"7"}} +{"description":"One World Trade Center","age":"new","place_id":1007} +{"index":{"_id":"8"}} +{"description":"Madison Square Garden","age":"recent","place_id":1008} +{"index":{"_id":"9"}} +{"description":"Wall Street","age":"colonial","place_id":1009} +{"index":{"_id":"10"}} +{"description":"Fifth Avenue","age":"19th century","place_id":1010} diff --git a/integ-test/src/test/resources/time_test_data2.json b/integ-test/src/test/resources/time_test_data2.json new file mode 100644 index 00000000000..db92260798c --- /dev/null +++ b/integ-test/src/test/resources/time_test_data2.json @@ -0,0 +1,40 @@ +{"index":{"_id":"1"}} +{"timestamp":"2025-08-01T04:00:00","value":2001,"category":"E","@timestamp":"2025-08-01T04:00:00"} +{"index":{"_id":"2"}} +{"timestamp":"2025-08-01T02:30:00","value":2002,"category":"F","@timestamp":"2025-08-01T02:30:00"} +{"index":{"_id":"3"}} +{"timestamp":"2025-08-01T01:00:00","value":2003,"category":"E","@timestamp":"2025-08-01T01:00:00"} +{"index":{"_id":"4"}} +{"timestamp":"2025-07-31T22:15:00","value":2004,"category":"F","@timestamp":"2025-07-31T22:15:00"} +{"index":{"_id":"5"}} +{"timestamp":"2025-07-31T20:45:00","value":2005,"category":"E","@timestamp":"2025-07-31T20:45:00"} +{"index":{"_id":"6"}} +{"timestamp":"2025-07-31T18:30:00","value":2006,"category":"F","@timestamp":"2025-07-31T18:30:00"} +{"index":{"_id":"7"}} +{"timestamp":"2025-07-31T16:00:00","value":2007,"category":"E","@timestamp":"2025-07-31T16:00:00"} +{"index":{"_id":"8"}} +{"timestamp":"2025-07-31T14:15:00","value":2008,"category":"F","@timestamp":"2025-07-31T14:15:00"} +{"index":{"_id":"9"}} +{"timestamp":"2025-07-31T12:30:00","value":2009,"category":"E","@timestamp":"2025-07-31T12:30:00"} +{"index":{"_id":"10"}} +{"timestamp":"2025-07-31T10:45:00","value":2010,"category":"F","@timestamp":"2025-07-31T10:45:00"} +{"index":{"_id":"11"}} +{"timestamp":"2025-07-31T08:00:00","value":2011,"category":"E","@timestamp":"2025-07-31T08:00:00"} +{"index":{"_id":"12"}} +{"timestamp":"2025-07-31T06:15:00","value":2012,"category":"F","@timestamp":"2025-07-31T06:15:00"} +{"index":{"_id":"13"}} +{"timestamp":"2025-07-31T04:30:00","value":2013,"category":"E","@timestamp":"2025-07-31T04:30:00"} +{"index":{"_id":"14"}} +{"timestamp":"2025-07-31T02:45:00","value":2014,"category":"F","@timestamp":"2025-07-31T02:45:00"} +{"index":{"_id":"15"}} +{"timestamp":"2025-07-31T01:00:00","value":2015,"category":"E","@timestamp":"2025-07-31T01:00:00"} +{"index":{"_id":"16"}} +{"timestamp":"2025-07-30T23:15:00","value":2016,"category":"F","@timestamp":"2025-07-30T23:15:00"} +{"index":{"_id":"17"}} +{"timestamp":"2025-07-30T21:30:00","value":2017,"category":"E","@timestamp":"2025-07-30T21:30:00"} +{"index":{"_id":"18"}} +{"timestamp":"2025-07-30T19:45:00","value":2018,"category":"F","@timestamp":"2025-07-30T19:45:00"} +{"index":{"_id":"19"}} +{"timestamp":"2025-07-30T18:00:00","value":2019,"category":"E","@timestamp":"2025-07-30T18:00:00"} +{"index":{"_id":"20"}} +{"timestamp":"2025-07-30T16:15:00","value":2020,"category":"F","@timestamp":"2025-07-30T16:15:00"} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 96ca6c4d835..230297b21c0 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -125,6 +125,7 @@ TIME_ZONE: 'TIME_ZONE'; TRAINING_DATA_SIZE: 'TRAINING_DATA_SIZE'; ANOMALY_SCORE_THRESHOLD: 'ANOMALY_SCORE_THRESHOLD'; APPEND: 'APPEND'; +MULTISEARCH: 'MULTISEARCH'; COUNTFIELD: 'COUNTFIELD'; SHOWCOUNT: 'SHOWCOUNT'; LIMIT: 'LIMIT'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 3ee0c568b41..d702d366ae5 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -20,7 +20,7 @@ pplStatement ; queryStatement - : pplCommands (PIPE commands)* + : (PIPE)? pplCommands (PIPE commands)* ; explainStatement @@ -43,6 +43,7 @@ pplCommands : describeCommand | showDataSourcesCommand | searchCommand + | multisearchCommand ; commands @@ -114,6 +115,7 @@ commandName | REVERSE | REGEX | APPEND + | MULTISEARCH | REX ; @@ -460,6 +462,10 @@ appendCommand : APPEND LT_SQR_PRTHS searchCommand? (PIPE commands)* RT_SQR_PRTHS ; +multisearchCommand + : MULTISEARCH (LT_SQR_PRTHS subSearch RT_SQR_PRTHS)+ + ; + kmeansCommand : KMEANS (kmeansParameter)* ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 38d070427f8..573e165a8fc 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -66,6 +66,7 @@ import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.MinSpanBin; +import org.opensearch.sql.ast.tree.Multisearch; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -87,6 +88,7 @@ import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Window; +import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.common.utils.StringUtils; @@ -1021,6 +1023,26 @@ public UnresolvedPlan visitAppendCommand(OpenSearchPPLParser.AppendCommandContex return new Append(subsearch); } + @Override + public UnresolvedPlan visitMultisearchCommand(OpenSearchPPLParser.MultisearchCommandContext ctx) { + List subsearches = new ArrayList<>(); + + // Process each subsearch + for (OpenSearchPPLParser.SubSearchContext subsearchCtx : ctx.subSearch()) { + // Use the existing visitSubSearch logic + UnresolvedPlan fullSubsearch = visitSubSearch(subsearchCtx); + subsearches.add(fullSubsearch); + } + + // Validate minimum number of subsearches + if (subsearches.size() < 2) { + throw new SyntaxCheckException( + "Multisearch command requires at least two subsearches. Provided: " + subsearches.size()); + } + + return new Multisearch(subsearches); + } + @Override public UnresolvedPlan visitRexCommand(OpenSearchPPLParser.RexCommandContext ctx) { UnresolvedExpression field = internalVisitExpression(ctx.rexExpr().field); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index eeca282cb9d..dac64e8b1bb 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -69,6 +69,7 @@ import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.MinSpanBin; +import org.opensearch.sql.ast.tree.Multisearch; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Patterns; import org.opensearch.sql.ast.tree.Project; @@ -580,6 +581,36 @@ public String visitAppend(Append node, String context) { return StringUtils.format("%s | append [%s ]", child, subsearch); } + @Override + public String visitMultisearch(Multisearch node, String context) { + List anonymizedSubsearches = new ArrayList<>(); + + for (UnresolvedPlan subsearch : node.getSubsearches()) { + String anonymizedSubsearch = anonymizeData(subsearch); + anonymizedSubsearch = "search " + anonymizedSubsearch; + anonymizedSubsearch = + anonymizedSubsearch + .replaceAll("\\bsource=\\w+", "source=table") // Replace table names after source= + .replaceAll( + "\\b(?!source|fields|where|stats|head|tail|sort|eval|rename|multisearch|search|table|identifier|\\*\\*\\*)\\w+(?=\\s*[<>=!])", + "identifier") // Replace field names before operators + .replaceAll( + "\\b(?!source|fields|where|stats|head|tail|sort|eval|rename|multisearch|search|table|identifier|\\*\\*\\*)\\w+(?=\\s*,)", + "identifier") // Replace field names before commas + .replaceAll( + "fields" + + " \\+\\s*\\b(?!source|fields|where|stats|head|tail|sort|eval|rename|multisearch|search|table|identifier|\\*\\*\\*)\\w+", + "fields + identifier") // Replace field names after 'fields +' + .replaceAll( + "fields" + + " \\+\\s*identifier,\\s*\\b(?!source|fields|where|stats|head|tail|sort|eval|rename|multisearch|search|table|identifier|\\*\\*\\*)\\w+", + "fields + identifier,identifier"); // Handle multiple fields + anonymizedSubsearches.add(StringUtils.format("[%s]", anonymizedSubsearch)); + } + + return StringUtils.format("| multisearch %s", String.join(" ", anonymizedSubsearches)); + } + @Override public String visitValues(Values node, String context) { // In case legacy SQL relies on it, return empty to fail open anyway. diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java new file mode 100644 index 00000000000..e69030753f2 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java @@ -0,0 +1,373 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.sql.Timestamp; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLMultisearchTest extends CalcitePPLAbstractTest { + + public CalcitePPLMultisearchTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + + // Add timestamp tables for multisearch testing with format matching time_test_data.json + ImmutableList timeData1 = + ImmutableList.of( + new Object[] { + Timestamp.valueOf("2025-08-01 03:47:41"), + 8762, + "A", + Timestamp.valueOf("2025-08-01 03:47:41") + }, + new Object[] { + Timestamp.valueOf("2025-08-01 01:14:11"), + 9015, + "B", + Timestamp.valueOf("2025-08-01 01:14:11") + }, + new Object[] { + Timestamp.valueOf("2025-07-31 23:40:33"), + 8676, + "A", + Timestamp.valueOf("2025-07-31 23:40:33") + }, + new Object[] { + Timestamp.valueOf("2025-07-31 21:07:03"), + 8490, + "B", + Timestamp.valueOf("2025-07-31 21:07:03") + }); + + ImmutableList timeData2 = + ImmutableList.of( + new Object[] { + Timestamp.valueOf("2025-08-01 04:00:00"), + 2001, + "E", + Timestamp.valueOf("2025-08-01 04:00:00") + }, + new Object[] { + Timestamp.valueOf("2025-08-01 02:30:00"), + 2002, + "F", + Timestamp.valueOf("2025-08-01 02:30:00") + }, + new Object[] { + Timestamp.valueOf("2025-08-01 01:00:00"), + 2003, + "E", + Timestamp.valueOf("2025-08-01 01:00:00") + }, + new Object[] { + Timestamp.valueOf("2025-07-31 22:15:00"), + 2004, + "F", + Timestamp.valueOf("2025-07-31 22:15:00") + }); + + schema.add("TIME_DATA1", new TimeDataTable(timeData1)); + schema.add("TIME_DATA2", new TimeDataTable(timeData2)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testBasicMultisearch() { + String ppl = + "| multisearch " + + "[search source=EMP | where DEPTNO = 10] " + + "[search source=EMP | where DEPTNO = 20]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalFilter(condition=[=($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 10\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20"; + verifyPPLToSparkSQL(root, expectedSparkSql); + verifyResultCount(root, 8); + } + + @Test + public void testMultisearchCrossIndices() { + // Test multisearch with different tables (indices) + String ppl = + "| multisearch [search source=EMP | where DEPTNO = 10 | fields EMPNO, ENAME," + + " DEPTNO] [search source=DEPT | where DEPTNO = 10 | fields DEPTNO, DNAME | eval EMPNO" + + " = DEPTNO, ENAME = DNAME]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," + + " DNAME=[null:VARCHAR(14)], EMPNO0=[null:TINYINT], ENAME0=[null:VARCHAR(14)])\n" + + " LogicalFilter(condition=[=($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " DEPTNO=[null:TINYINT], DEPTNO0=[$0], DNAME=[$1], EMPNO0=[$0], ENAME0=[$1])\n" + + " LogicalFilter(condition=[=($0, 10)])\n" + + " LogicalTableScan(table=[[scott, DEPT]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `EMPNO`, `ENAME`, `DEPTNO`, CAST(NULL AS TINYINT) `DEPTNO0`, CAST(NULL AS STRING)" + + " `DNAME`, CAST(NULL AS TINYINT) `EMPNO0`, CAST(NULL AS STRING) `ENAME0`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 10\n" + + "UNION ALL\n" + + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, CAST(NULL AS" + + " TINYINT) `DEPTNO`, `DEPTNO` `DEPTNO0`, `DNAME`, `DEPTNO` `EMPNO0`, `DNAME`" + + " `ENAME0`\n" + + "FROM `scott`.`DEPT`\n" + + "WHERE `DEPTNO` = 10"; + verifyPPLToSparkSQL(root, expectedSparkSql); + verifyResultCount(root, 4); // 3 employees + 1 department + } + + @Test + public void testMultisearchWithStats() { + String ppl = + "| multisearch " + + "[search source=EMP | where DEPTNO = 10 | eval type = \"accounting\"] " + + "[search source=EMP | where DEPTNO = 20 | eval type = \"research\"] " + + "| stats count by type"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(count=[$1], type=[$0])\n" + + " LogicalAggregate(group=[{0}], count=[COUNT()])\n" + + " LogicalProject(type=[$8])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], type=['accounting':VARCHAR])\n" + + " LogicalFilter(condition=[=($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], type=['research':VARCHAR])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT COUNT(*) `count`, `type`\n" + + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + + " 'accounting' `type`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 10\n" + + "UNION ALL\n" + + "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + + " 'research' `type`\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20) `t3`\n" + + "GROUP BY `type`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + verifyResultCount(root, 2); + } + + @Test + public void testMultisearchThreeSubsearches() { + String ppl = + "| multisearch " + + "[search source=EMP | where DEPTNO = 10] " + + "[search source=EMP | where DEPTNO = 20] " + + "[search source=EMP | where DEPTNO = 30]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalFilter(condition=[=($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 30)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 10\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 30"; + verifyPPLToSparkSQL(root, expectedSparkSql); + verifyResultCount(root, 14); + } + + // ======================================================================== + // Timestamp Interleaving Tests + // ======================================================================== + + @Test + public void testMultisearchTimestampInterleaving() { + String ppl = + "| multisearch " + + "[search source=TIME_DATA1 | where category IN (\"A\", \"B\")] " + + "[search source=TIME_DATA2 | where category IN (\"E\", \"F\")] " + + "| head 6"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalSort(sort0=[$3], dir0=[DESC], fetch=[6])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalFilter(condition=[SEARCH($2, Sarg['A':VARCHAR, 'B':VARCHAR]:VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, TIME_DATA1]])\n" + + " LogicalFilter(condition=[SEARCH($2, Sarg['E':VARCHAR, 'F':VARCHAR]:VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, TIME_DATA2]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT *\n" + + "FROM (SELECT *\n" + + "FROM `scott`.`TIME_DATA1`\n" + + "WHERE `category` IN ('A', 'B')\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`TIME_DATA2`\n" + + "WHERE `category` IN ('E', 'F'))\n" + + "ORDER BY `@timestamp` DESC NULLS FIRST\n" + + "LIMIT 6"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMultisearchWithTimestampFiltering() { + String ppl = + "| multisearch " + + "[search source=TIME_DATA1 | where @timestamp > \"2025-07-31 23:00:00\"] " + + "[search source=TIME_DATA2 | where @timestamp > \"2025-07-31 23:00:00\"] " + + "| sort @timestamp desc"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalSort(sort0=[$3], dir0=[DESC-nulls-last])\n" + + " LogicalSort(sort0=[$3], dir0=[DESC])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalFilter(condition=[>($3, TIMESTAMP('2025-07-31 23:00:00':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, TIME_DATA1]])\n" + + " LogicalFilter(condition=[>($3, TIMESTAMP('2025-07-31 23:00:00':VARCHAR))])\n" + + " LogicalTableScan(table=[[scott, TIME_DATA2]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT *\n" + + "FROM (SELECT `timestamp`, `value`, `category`, `@timestamp`\n" + + "FROM (SELECT *\n" + + "FROM `scott`.`TIME_DATA1`\n" + + "WHERE `@timestamp` > `TIMESTAMP`('2025-07-31 23:00:00')\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`TIME_DATA2`\n" + + "WHERE `@timestamp` > `TIMESTAMP`('2025-07-31 23:00:00'))\n" + + "ORDER BY `@timestamp` DESC NULLS FIRST) `t2`\n" + + "ORDER BY `@timestamp` DESC"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + // ======================================================================== + // Custom Table Implementation for Timestamp Testing + // ======================================================================== + + /** Custom table implementation with timestamp fields for multisearch testing. */ + @RequiredArgsConstructor + static class TimeDataTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("timestamp", SqlTypeName.TIMESTAMP) + .nullable(true) + .add("value", SqlTypeName.INTEGER) + .nullable(true) + .add("category", SqlTypeName.VARCHAR) + .nullable(true) + .add("@timestamp", SqlTypeName.TIMESTAMP) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 99614657b1e..1d3d4a9fe9f 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -1106,4 +1106,108 @@ public void testRexSedModeWithOffsetFieldThrowsException() { // Test that SED mode and offset_field cannot be used together (align with Splunk behavior) plan("source=test | rex field=email mode=sed offset_field=matchpos \"s/@.*/@company.com/\""); } + + // Multisearch tests + + @Test + public void testBasicMultisearchParsing() { + // Test basic multisearch parsing + plan("| multisearch [ search source=test1 ] [ search source=test2 ]"); + } + + @Test + public void testMultisearchWithStreamingCommands() { + // Test multisearch with streaming commands + plan( + "| multisearch [ search source=test1 | where age > 30 | fields name, age ] " + + "[ search source=test2 | eval category=\"young\" | rename id as user_id ]"); + } + + @Test + public void testMultisearchWithStatsCommand() { + // Test multisearch with stats command - now allowed + plan( + "| multisearch [ search source=test1 | stats count() by gender ] " + + "[ search source=test2 | fields name, age ]"); + } + + @Test + public void testMultisearchWithSortCommand() { + // Test multisearch with sort command - now allowed + plan( + "| multisearch [ search source=test1 | sort age ] " + + "[ search source=test2 | fields name, age ]"); + } + + @Test + public void testMultisearchWithBinCommand() { + // Test multisearch with bin command - now allowed + plan( + "| multisearch [ search source=test1 | bin age span=10 ] " + + "[ search source=test2 | fields name, age ]"); + } + + @Test + public void testMultisearchWithTimechartCommand() { + // Test multisearch with timechart command - now allowed + plan( + "| multisearch [ search source=test1 | timechart count() by age ] " + + "[ search source=test2 | fields name, age ]"); + } + + @Test + public void testMultisearchWithRareCommand() { + // Test multisearch with rare command - now allowed + plan( + "| multisearch [ search source=test1 | rare gender ] " + + "[ search source=test2 | fields name, age ]"); + } + + @Test + public void testMultisearchWithDedupeCommand() { + // Test multisearch with dedup command - now allowed + plan( + "| multisearch [ search source=test1 | dedup name ] " + + "[ search source=test2 | fields name, age ]"); + } + + @Test + public void testMultisearchWithJoinCommand() { + // Test multisearch with join command - now allowed + plan( + "| multisearch [ search source=test1 | join left=l right=r where l.id = r.id" + + " test2 ] [ search source=test3 | fields name, age ]"); + } + + @Test + public void testMultisearchWithComplexPipeline() { + // Test multisearch with complex pipeline (previously called streaming) + plan( + "| multisearch [ search source=test1 | where age > 30 | eval category=\"adult\"" + + " | fields name, age, category | rename age as years_old | head 100 ] [ search" + + " source=test2 | where status=\"active\" | expand tags | flatten nested_data |" + + " fillnull with \"unknown\" | reverse ]"); + } + + @Test + public void testMultisearchMixedCommands() { + // Test multisearch with mix of commands - now all allowed + plan( + "| multisearch [ search source=test1 | where age > 30 | stats count() ] " + + "[ search source=test2 | where status=\"active\" | sort name ]"); + } + + @Test + public void testMultisearchSingleSubsearchThrowsException() { + // Test multisearch with only one subsearch - should throw descriptive runtime exception + SyntaxCheckException exception = + assertThrows( + SyntaxCheckException.class, + () -> plan("| multisearch [ search source=test1 | fields name, age ]")); + + // Now we should get our descriptive runtime validation error message + assertEquals( + "Multisearch command requires at least two subsearches. Provided: 1", + exception.getMessage()); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 07bc711056a..4184d8211d9 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -686,6 +686,30 @@ public void testRexWithOffsetField() { anonymize("source=t | rex field=message \"(?[a-z]+)\" offset_field=pos")); } + @Test + public void testMultisearch() { + assertEquals( + "| multisearch [search source=table | where identifier < ***] [search" + + " source=table | where identifier >= ***]", + anonymize( + "| multisearch [search source=accounts | where age < 30] [search" + + " source=accounts | where age >= 30]")); + + assertEquals( + "| multisearch [search source=table | where identifier > ***] [search" + + " source=table | where identifier = ***]", + anonymize( + "| multisearch [search source=accounts | where balance > 20000]" + + " [search source=accounts | where state = 'CA']")); + + assertEquals( + "| multisearch [search source=table | fields + identifier,identifier] [search" + + " source=table | where identifier = ***]", + anonymize( + "| multisearch [search source=accounts | fields firstname, lastname]" + + " [search source=accounts | where age = 25]")); + } + private String anonymize(String query) { AstBuilder astBuilder = new AstBuilder(query, settings); return anonymize(astBuilder.visit(parser.parse(query)));