From 6859fbe11eef901c4d6214c01b5289a4a40b7096 Mon Sep 17 00:00:00 2001 From: Xinyuan Lu Date: Wed, 12 Nov 2025 14:12:49 +0800 Subject: [PATCH 1/2] Support `appendpipe`command in PPL (#4602) * add demos Signed-off-by: xinyual * add missing column Signed-off-by: xinyual * add appendpipe poc Signed-off-by: xinyual * slighty change syntax Signed-off-by: xinyual * add unresolved plan Signed-off-by: xinyual * add IT Signed-off-by: xinyual * add tests Signed-off-by: xinyual * remove useless ut Signed-off-by: xinyual * fix conflict Signed-off-by: xinyual * remove useless code Signed-off-by: xinyual * remove useless code Signed-off-by: xinyual * remove useless code Signed-off-by: xinyual * apply spotless Signed-off-by: xinyual * remove useless chaneg Signed-off-by: xinyual * add explain IT Signed-off-by: xinyual * fix IT Signed-off-by: xinyual * apply spotless Signed-off-by: xinyual * add doc Signed-off-by: xinyual * optimize doc Signed-off-by: xinyual * add UT Signed-off-by: xinyual * fix IT due to performance change Signed-off-by: xinyual * add multiply children check Signed-off-by: xinyual --------- Signed-off-by: xinyual --- .../org/opensearch/sql/analysis/Analyzer.java | 6 ++ .../sql/ast/AbstractNodeVisitor.java | 5 ++ .../org/opensearch/sql/ast/dsl/AstDSL.java | 6 ++ .../opensearch/sql/ast/tree/AppendPipe.java | 45 ++++++++++ .../sql/calcite/CalciteRelNodeVisitor.java | 29 +++++- docs/user/ppl/cmd/appendpipe.rst | 72 +++++++++++++++ .../sql/calcite/remote/CalciteExplainIT.java | 12 +++ .../remote/CalcitePPLAppendPipeCommandIT.java | 90 +++++++++++++++++++ .../calcite/explain_appendpipe_command.json | 6 ++ .../explain_appendpipe_command.json | 6 ++ ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 1 + ppl/src/main/antlr/OpenSearchPPLParser.g4 | 10 +++ .../opensearch/sql/ppl/parser/AstBuilder.java | 17 ++++ .../sql/ppl/utils/PPLQueryDataAnonymizer.java | 14 +++ .../ppl/calcite/CalcitePPLAppendPipeTest.java | 62 +++++++++++++ .../sql/ppl/parser/AstBuilderTest.java | 15 ++++ .../ppl/utils/PPLQueryDataAnonymizerTest.java | 13 +++ 17 files changed, 408 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java create mode 100644 docs/user/ppl/cmd/appendpipe.rst create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json create mode 100644 integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json create mode 100644 ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 7cb3a1e48bc..cf31ef77458 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -60,6 +60,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CloseCursor; @@ -832,6 +833,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) { throw getOnlyForCalciteException("Appendcol"); } + @Override + public LogicalPlan visitAppendPipe(AppendPipe node, AnalysisContext context) { + throw getOnlyForCalciteException("AppendPipe"); + } + @Override public LogicalPlan visitAppend(Append node, AnalysisContext context) { throw getOnlyForCalciteException("Append"); diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 99dc015c449..7595d92f352 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -48,6 +48,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CloseCursor; @@ -141,6 +142,10 @@ public T visitSearch(Search node, C context) { return visitChildren(node, context); } + public T visitAppendPipe(AppendPipe node, C context) { + return visitChildren(node, context); + } + public T visitFilter(Filter node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index e97b72be9d6..1fda1bfc58c 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -49,6 +49,7 @@ import org.opensearch.sql.ast.expression.WindowFunction; import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.tree.Aggregation; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; @@ -563,6 +564,11 @@ public static Trendline trendline( return new Trendline(sortField, Arrays.asList(computations)).attach(input); } + public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquery) { + + return new AppendPipe(subquery).attach(input); + } + public static Trendline.TrendlineComputation computation( Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) { return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type); diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java new file mode 100644 index 00000000000..0ea1cb9b453 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; + +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +public class AppendPipe extends UnresolvedPlan { + + private UnresolvedPlan subQuery; + + private UnresolvedPlan child; + + public AppendPipe(UnresolvedPlan subQuery) { + this.subQuery = subQuery; + } + + @Override + public AppendPipe attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitAppendPipe(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 075ea8571a7..78aebb05184 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -105,6 +105,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CloseCursor; @@ -247,6 +248,28 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { return context.relBuilder.peek(); } + @Override + public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { + visitChildren(node, context); + UnresolvedPlan subqueryPlan = node.getSubQuery(); + UnresolvedPlan childNode = subqueryPlan; + while (childNode.getChild() != null + && !childNode.getChild().isEmpty() + && !(childNode.getChild().getFirst() instanceof Values)) { + if (childNode.getChild().size() > 1) { + throw new RuntimeException("AppendPipe doesn't support multiply children subquery."); + } + childNode = (UnresolvedPlan) childNode.getChild().getFirst(); + } + childNode.attach(node.getChild().getFirst()); + + subqueryPlan.accept(this, context); + + RelNode subPipelineNode = context.relBuilder.build(); + RelNode mainNode = context.relBuilder.build(); + return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context); + } + @Override public RelNode visitRegex(Regex node, CalcitePlanContext context) { visitChildren(node, context); @@ -2137,9 +2160,13 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) { // 3. Merge two query schemas using shared logic RelNode subsearchNode = context.relBuilder.build(); RelNode mainNode = context.relBuilder.build(); + return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context); + } + private RelNode mergeTableAndResolveColumnConflict( + RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) { // Use shared schema merging logic that handles type conflicts via field renaming - List nodesToMerge = Arrays.asList(mainNode, subsearchNode); + List nodesToMerge = Arrays.asList(mainNode, subqueryNode); List projectedNodes = SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); diff --git a/docs/user/ppl/cmd/appendpipe.rst b/docs/user/ppl/cmd/appendpipe.rst new file mode 100644 index 00000000000..43c4dd1e84d --- /dev/null +++ b/docs/user/ppl/cmd/appendpipe.rst @@ -0,0 +1,72 @@ +========= +appendpipe +========= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command. +The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows. + +Version +======= +3.3.0 + +Syntax +============ +appendpipe [] + +* subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command. + +Example 1: Append rows from a total count to existing search result +==================================================================================== + +This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type. + +PPL query:: + + os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ]; + fetched rows / total rows = 6/6 + +------+--------+-------+-------+ + | part | gender | state | total | + |------+--------+-------+-------| + | 36 | M | TN | null | + | 33 | M | MD | null | + | 32 | M | IL | null | + | 28 | F | VA | null | + | null | F | null | 28 | + | null | M | null | 101 | + +------+--------+-------+-------+ + + + +Example 2: Append rows with merged column names +=============================================================== + +This example appends rows from "count by gender" to "sum by gender, state". + +PPL query:: + + os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ]; + fetched rows / total rows = 6/6 + +----------+--------+-------+ + | total | gender | state | + |----------+--------+-------| + | 36 | M | TN | + | 33 | M | MD | + | 32 | M | IL | + | 28 | F | VA | + | 28 | F | null | + | 101 | M | null | + +----------+--------+-------+ + +Limitations +=========== + +* **Schema Compatibility**: Same as command ``append``, when fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 9a7f425b1c3..2fbd16738f8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -895,6 +895,18 @@ public void testExplainAppendCommand() throws IOException { TEST_INDEX_BANK))); } + @Test + public void testExplainAppendPipeCommand() throws IOException { + String expected = loadExpectedPlan("explain_appendpipe_command.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + Locale.ROOT, + "source=%s | appendpipe [ stats count(balance) as cnt by gender ]", + TEST_INDEX_BANK))); + } + @Test public void testMvjoinExplain() throws IOException { String query = diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java new file mode 100644 index 00000000000..d25d3ca80db --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; + +import java.io.IOException; +import java.util.Locale; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase { + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + } + + @Test + public void testAppendPipe() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ " + + " sort -sum_age_by_gender ] |" + + " head 5", + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F")); + } + + @Test + public void testAppendDifferentIndex() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" + + " sum(age) as bank_sum_age ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_BANK)); + verifySchemaInOrder( + actual, + schema("sum", "bigint"), + schema("gender", "string"), + schema("bank_sum_age", "bigint")); + verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238)); + } + + @Test + public void testAppendpipeWithMergedColumn() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender |" + + " appendpipe [ stats sum(sum) as sum ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null)); + } + + @Test + public void testAppendpipeWithConflictTypeColumn() throws IOException { + Exception exception = + assertThrows( + Exception.class, + () -> + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum =" + + " cast(sum as double) ] | head 5", + TEST_INDEX_ACCOUNT))); + assertTrue(exception.getMessage().contains("due to incompatible types")); + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json new file mode 100644 index 00000000000..6ec42972a10 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json new file mode 100644 index 00000000000..2b111e119db --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n" + } +} \ No newline at end of file diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index bd0fe3ab384..896e09ae57b 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -62,6 +62,7 @@ BUFFER_LIMIT: 'BUFFER_LIMIT'; LABEL: 'LABEL'; SHOW_NUMBERED_TOKEN: 'SHOW_NUMBERED_TOKEN'; AGGREGATION: 'AGGREGATION'; +APPENDPIPE: 'APPENDPIPE'; //Native JOIN KEYWORDS JOIN: 'JOIN'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 0f702998be5..778cbeb38a0 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -19,6 +19,10 @@ pplStatement | queryStatement ; +subPipeline + : PIPE? commands (PIPE commands)* + ; + queryStatement : (PIPE)? pplCommands (PIPE commands)* ; @@ -80,6 +84,7 @@ commands | chartCommand | timechartCommand | rexCommand + | appendPipeCommand | replaceCommand ; @@ -124,6 +129,7 @@ commandName | APPEND | MULTISEARCH | REX + | APPENDPIPE | REPLACE ; @@ -224,6 +230,10 @@ statsCommand : STATS statsArgs statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (dedupSplitArg)? ; +appendPipeCommand + : APPENDPIPE LT_SQR_PRTHS subPipeline RT_SQR_PRTHS + ; + statsArgs : (partitionsArg | allnumArg | delimArg | bucketNullableArg)* ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 6243487112b..8bf0ec6be20 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -73,6 +73,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; @@ -165,6 +166,16 @@ public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementCont .reduce(pplCommand, (r, e) -> e.attach(e instanceof Join ? projectExceptMeta(r) : r)); } + @Override + public UnresolvedPlan visitSubPipeline(OpenSearchPPLParser.SubPipelineContext ctx) { + List cmds = ctx.commands(); + if (cmds.isEmpty()) { + throw new IllegalArgumentException("appendpipe [] is empty"); + } + UnresolvedPlan seed = visit(cmds.getFirst()); + return cmds.stream().skip(1).map(this::visit).reduce(seed, (left, op) -> op.attach(left)); + } + @Override public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) { UnresolvedPlan searchCommand = visit(ctx.searchCommand()); @@ -235,6 +246,12 @@ public UnresolvedPlan visitWhereCommand(WhereCommandContext ctx) { return new Filter(internalVisitExpression(ctx.logicalExpression())); } + @Override + public UnresolvedPlan visitAppendPipeCommand(OpenSearchPPLParser.AppendPipeCommandContext ctx) { + UnresolvedPlan plan = visit(ctx.subPipeline()); + return new AppendPipe(plan); + } + @Override public UnresolvedPlan visitJoinCommand(OpenSearchPPLParser.JoinCommandContext ctx) { // a sql-like syntax if join criteria existed diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index baf55418891..0d0cac7eebc 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -55,6 +55,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CountBin; @@ -714,6 +715,19 @@ private String visitExpression(UnresolvedExpression expression) { return expressionAnalyzer.analyze(expression, null); } + @Override + public String visitAppendPipe(AppendPipe node, String context) { + Values emptyValue = new Values(null); + UnresolvedPlan childNode = node.getSubQuery(); + while (childNode != null && !childNode.getChild().isEmpty()) { + childNode = (UnresolvedPlan) childNode.getChild().get(0); + } + childNode.attach(emptyValue); + String child = node.getChild().get(0).accept(this, context); + String subPipeline = anonymizeData(node.getSubQuery()); + return StringUtils.format("%s | appendpipe [%s]", child, subPipeline); + } + @Override public String visitFillNull(FillNull node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java new file mode 100644 index 00000000000..faf944da4a0 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +public class CalcitePPLAppendPipeTest extends CalcitePPLAbstractTest { + public CalcitePPLAppendPipeTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testAppendPipe() { + String ppl = "source=EMP | appendpipe [ where DEPTNO = 20 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testAppendPipeWithMergedColumns() { + String ppl = + "source=EMP | fields DEPTNO | appendpipe [ fields DEPTNO | eval DEPTNO_PLUS =" + + " DEPTNO + 10 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[null:INTEGER])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[+($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 28); + + String expectedSparkSql = + "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT `DEPTNO`, `DEPTNO` + 10 `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 1ace4613e68..9b68e6bc04c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -13,6 +13,7 @@ import static org.opensearch.sql.ast.dsl.AstDSL.agg; import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; import static org.opensearch.sql.ast.dsl.AstDSL.alias; +import static org.opensearch.sql.ast.dsl.AstDSL.appendPipe; import static org.opensearch.sql.ast.dsl.AstDSL.argument; import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.compare; @@ -1003,6 +1004,20 @@ public void testFillNullValueWithFields() { fillNull(relation("t"), intLiteral(0), true, field("a"), field("b"), field("c"))); } + @Test + public void testAppendPipe() { + assertEqual( + "source=t | appendpipe [ stats COUNT() ]", + appendPipe( + relation("t"), + agg( + null, + exprList(alias("COUNT()", aggregate("count", AstDSL.allFields()))), + emptyList(), + emptyList(), + defaultStatsArgs()))); + } + public void testTrendline() { assertEqual( "source=t | trendline sma(5, test_field) as test_field_alias sma(1, test_field_2) as" diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 23c34042c5c..371f091e2ec 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -753,6 +753,19 @@ public void testRegex() { anonymize("source=t | regex email='.*@domain.com' | fields email")); } + @Test + public void testAppendPipe() { + assertEquals( + "source=table | appendpipe [ | stats count()]", + anonymize("source=t | appendpipe [stats count()]")); + assertEquals( + "source=table | appendpipe [ | where identifier = ***]", + anonymize("source=t | appendpipe [where fieldname=='pattern']")); + assertEquals( + "source=table | appendpipe [ | sort identifier]", + anonymize("source=t | appendpipe [sort fieldname]")); + } + @Test public void testRexCommand() { when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10); From 9ef12b4324083f21fbc9cf9ea47f38ca7aa9af39 Mon Sep 17 00:00:00 2001 From: xinyual Date: Wed, 12 Nov 2025 14:55:15 +0800 Subject: [PATCH 2/2] avoid to use get first Signed-off-by: xinyual --- .../org/opensearch/sql/calcite/CalciteRelNodeVisitor.java | 6 +++--- .../main/java/org/opensearch/sql/ppl/parser/AstBuilder.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 78aebb05184..21ba2f40bc9 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -255,13 +255,13 @@ public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { UnresolvedPlan childNode = subqueryPlan; while (childNode.getChild() != null && !childNode.getChild().isEmpty() - && !(childNode.getChild().getFirst() instanceof Values)) { + && !(childNode.getChild().get(0) instanceof Values)) { if (childNode.getChild().size() > 1) { throw new RuntimeException("AppendPipe doesn't support multiply children subquery."); } - childNode = (UnresolvedPlan) childNode.getChild().getFirst(); + childNode = (UnresolvedPlan) childNode.getChild().get(0); } - childNode.attach(node.getChild().getFirst()); + childNode.attach(node.getChild().get(0)); subqueryPlan.accept(this, context); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 8bf0ec6be20..2b9ff36669c 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -172,7 +172,7 @@ public UnresolvedPlan visitSubPipeline(OpenSearchPPLParser.SubPipelineContext ct if (cmds.isEmpty()) { throw new IllegalArgumentException("appendpipe [] is empty"); } - UnresolvedPlan seed = visit(cmds.getFirst()); + UnresolvedPlan seed = visit(cmds.get(0)); return cmds.stream().skip(1).map(this::visit).reduce(seed, (left, op) -> op.attach(left)); }