From 0098acb77dbf22b03fb2cefaa57a45b5f1a5c4f0 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 26 Jan 2026 16:54:33 -0800 Subject: [PATCH] Adopt appendcol, appendpipe, multisearch to spath Signed-off-by: Tomoyuki Morita --- .../ast/analysis/FieldResolutionVisitor.java | 30 +++--- .../sql/calcite/CalciteRelNodeVisitor.java | 1 + .../sql/calcite/DynamicFieldsHelper.java | 55 +++++++++-- docs/user/ppl/cmd/spath.md | 2 +- .../remote/CalcitePPLSpathCommandIT.java | 96 +++++++++++++++++++ .../parser/FieldResolutionVisitorTest.java | 32 +++++++ 6 files changed, 197 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java index 5d51efd718..e1748e0e36 100644 --- a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java @@ -349,12 +349,6 @@ public Node visitSearch(Search node, FieldResolutionContext context) { return node; } - @Override - public Node visitAppendPipe(AppendPipe node, FieldResolutionContext context) { - visitChildren(node, context); - return node; - } - @Override public Node visitRegex(Regex node, FieldResolutionContext context) { Set regexFields = extractFieldsFromExpression(node.getField()); @@ -507,8 +501,10 @@ public Node visitFillNull(FillNull node, FieldResolutionContext context) { @Override public Node visitAppendCol(AppendCol node, FieldResolutionContext context) { - throw new IllegalArgumentException( - "AppendCol command cannot be used together with spath command"); + // dispatch requirements to subsearch and main + acceptAndVerifyNodeVisited(node.getSubSearch(), context); + visitChildren(node, context); + return node; } @Override @@ -520,9 +516,10 @@ public Node visitAppend(Append node, FieldResolutionContext context) { } @Override - public Node visitMultisearch(Multisearch node, FieldResolutionContext context) { - throw new IllegalArgumentException( - "Multisearch command cannot be used together with spath command"); + public Node visitAppendPipe(AppendPipe node, FieldResolutionContext context) { + acceptAndVerifyNodeVisited(node.getSubQuery(), context); + visitChildren(node, context); + return node; } @Override @@ -532,7 +529,16 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) { @Override public Node visitValues(Values node, FieldResolutionContext context) { - throw new IllegalArgumentException("Values command cannot be used together with spath command"); + // do nothing + return node; + } + + @Override + public Node visitMultisearch(Multisearch node, FieldResolutionContext context) { + // dispatch requirements to subsearches and main + node.getSubsearches().forEach(subsearch -> acceptAndVerifyNodeVisited(subsearch, context)); + visitChildren(node, context); + return node; } @Override diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index d64cc205cd..c8701d274d 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -2232,6 +2232,7 @@ public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) { prunedSubSearch.accept(this, context); subsearchNodes.add(context.relBuilder.build()); } + subsearchNodes = DynamicFieldsHelper.adjustInputsForDynamicFields(subsearchNodes, context); // Use shared schema merging logic that handles type conflicts via field renaming List alignedNodes = diff --git a/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java index a8073772a9..05eb48234e 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -99,8 +100,9 @@ static void adjustJoinInputsForDynamicFields( // build once to modify the inputs already in the stack. RelNode right = context.relBuilder.build(); RelNode left = context.relBuilder.build(); - left = adjustFieldsForDynamicFields(left, right, context); - right = adjustFieldsForDynamicFields(right, left, context); + List inputs = adjustInputsForDynamicFields(List.of(right, left), context); + right = inputs.get(0); + left = inputs.get(1); context.relBuilder.push(left); // `as(alias)` is needed since `build()` won't preserve alias leftAlias.map(alias -> context.relBuilder.as(alias)); @@ -119,6 +121,36 @@ static RelNode adjustFieldsForDynamicFields( return target; } + /** Adjust inputs to align the static/dynamic fields each other */ + static List adjustInputsForDynamicFields( + List inputs, CalcitePlanContext context) { + boolean requireAdjustment = inputs.stream().anyMatch(input -> hasDynamicFields(input)); + if (requireAdjustment) { + List requiredStaticFields = getRequiredStaticFields(inputs); + return inputs.stream() + .map(input -> adjustFieldsForDynamicFields(input, requiredStaticFields, context)) + .collect(Collectors.toList()); + } else { + return inputs; + } + } + + static List getRequiredStaticFields(List inputs) { + Set requiredStaticFields = new HashSet(); + for (RelNode input : inputs) { + if (hasDynamicFields(input)) { + requiredStaticFields.addAll(getStaticFields(input)); + } + } + return toSortedList(requiredStaticFields); + } + + private static List toSortedList(Collection collection) { + ArrayList result = new ArrayList<>(collection); + Collections.sort(result); + return result; + } + /** * Project node's fields in `requiredFieldNames` as static field, and put other fields into `_MAP` * (dynamic fields) This projection is needed when merging an input with dynamic fields and an @@ -128,16 +160,27 @@ static RelNode adjustFieldsForDynamicFields( static RelNode adjustFieldsForDynamicFields( RelNode node, List staticFieldNames, CalcitePlanContext context) { context.relBuilder.push(node); - List existingFields = node.getRowType().getFieldNames(); + List existingFields = getStaticFields(node); List project = new ArrayList<>(); for (String existingField : existingFields) { if (staticFieldNames.contains(existingField)) { project.add(context.rexBuilder.makeInputRef(node, existingFields.indexOf(existingField))); } } - project.add( - context.relBuilder.alias( - getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP)); + if (hasDynamicFields(node)) { + // _MAP = MAP_APPEND(_MAP, MAP(existingFields - staticFields)) + RexNode existingDynamicFieldsMap = context.relBuilder.field(DYNAMIC_FIELDS_MAP); + RexNode additionalFieldsMap = getFieldsAsMap(existingFields, staticFieldNames, context); + RexNode mapAppend = + context.rexBuilder.makeCall( + BuiltinFunctionName.MAP_APPEND, existingDynamicFieldsMap, additionalFieldsMap); + project.add(context.relBuilder.alias(mapAppend, DYNAMIC_FIELDS_MAP)); + } else { + // _MAP = MAP(existingFields - staticFields) + project.add( + context.relBuilder.alias( + getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP)); + } return context.relBuilder.project(project).build(); } diff --git a/docs/user/ppl/cmd/spath.md b/docs/user/ppl/cmd/spath.md index f107b0df03..11d251a957 100644 --- a/docs/user/ppl/cmd/spath.md +++ b/docs/user/ppl/cmd/spath.md @@ -41,7 +41,7 @@ For more information about path syntax, see [json_extract](../functions/json.md# * **Limitation**: Field order in the result could be inconsistent with query without `spath` command, and the behavior might change in the future version. * **Limitation**: Filter with subquery (`where in/exists [...]`) is not supported with `spath` command. * **Limitation**: `fillnull` command requires to specify fields when used with `spath` command. -* **Limitation**: Following commands cannot be used together with `spath` command: `appendcol`, `multisearch`, `lookup`. +* **Limitation**: Following commands cannot be used together with `spath` command: `lookup`. * **Performance**: Filter records before `spath` command for best performance (see Example 8) * **Internal Implementation**: The auto extraction feature uses an internal `_MAP` system column to store dynamic fields during query processing. This column is automatically expanded into individual columns in the final results and users don't need to reference it directly. For more information, see [System Columns](../general/identifiers.md#system-columns). diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java index 0c2c2e94a7..e1ffc9e234 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java @@ -262,4 +262,100 @@ public void testAppendWithSpathInSubsearchDynamicFields() throws IOException { rows(null, null, null, "simple", "4", sj("{'a': 1, 'b': 2, 'c': 3}")), rows("1", "3", "2", "simple", null, sj("{'a': 1, 'b': 2, 'c': 3}"))); } + + @Test + public void testAppendColWithSpathInMain() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData | appendcol [where" + + " category='simple'] | fields a, c, *"); + verifySchema( + result, + schema("a", "string"), + schema("c", "string"), + schema("category", "string"), + schema("userData", "string"), + schema("b", "string")); + verifyDataRows( + result, + rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"), + rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2")); + } + + @Test + public void testAppendColWithSpathInSubsearch() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | appendcol [where category='simple' |" + + " spath input=userData] | fields a, c, *"); + verifySchema( + result, + schema("a", "string"), + schema("c", "string"), + schema("category", "string"), + schema("userData", "string"), + schema("b", "string")); + verifyDataRows( + result, + rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"), + rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2")); + } + + @Test + public void testAppendColWithSpathInBothInputs() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData | appendcol [where" + + " category='simple' | spath input=userData ] | fields a, c, *"); + verifySchema( + result, + schema("a", "string"), + schema("c", "string"), + schema("b", "string"), + schema("category", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows("1", "3", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}")), + rows("1", "3", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"))); + } + + @Test + public void testAppendPipeWithSpathInMain() throws IOException { + JSONObject result = + executeQuery( + "source=test_json | where category='simple' | spath input=userData | stats sum(a) as" + + " total by b | appendpipe [stats sum(total) as total] | head 5"); + verifySchema(result, schema("total", "double"), schema("b", "string")); + verifyDataRows(result, rows(2, "2"), rows(2, null)); + } + + @Test + public void testMultisearchWithSpath() throws IOException { + JSONObject result = + executeQuery( + "| multisearch [source=test_json | where category='simple' | spath input=userData |" + + " head 1] [source=test_json | where category='nested' | spath input=userData] |" + + " fields a, c, *"); + verifySchema( + result, + schema("a", "string"), + schema("c", "string"), + schema("b", "string"), + schema("category", "string"), + schema("nested.d{}", "string"), + schema("nested.e", "string"), + schema("userData", "string")); + verifyDataRows( + result, + rows("1", "3", "2", "simple", null, null, sj("{'a': 1, 'b': 2, 'c': 3}")), + rows( + null, + null, + null, + "nested", + "[1, 2, 3]", + "str", + sj("{'nested': {'d': [1, 2, 3], 'e': 'str'}}"))); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java index a4d4f4874a..72761bb96f 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java @@ -371,6 +371,38 @@ public void testAppend() { "sub", new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"))); } + @Test + public void testAppendCol() { + String query = + "source=main | where testCase='simple' | eval c = 4 | " + + "appendcol [where testCase='simple' ] | fields a, c, *"; + assertMultiRelationFields( + query, Map.of("main", new FieldResolutionResult(Set.of("a", "testCase"), "*"))); + } + + @Test + public void testAppendpipe() { + String query = + "source=main | where testCase='simple' | stats sum(a) as sum_a by b | " + + "appendpipe [stats sum(sum_a) as total] | head 5"; + assertMultiRelationFields( + query, Map.of("main", new FieldResolutionResult(Set.of("a", "b", "testCase")))); + } + + @Test + public void testMultisearch() { + String query = + "| multisearch [source=main | where testCase='simple'] [source=sub | where" + + " testCase='simple'] | fields a, c, *"; + assertMultiRelationFields( + query, + Map.of( + "main", + new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"), + "sub", + new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"))); + } + @Test public void testAppendWithSpathInMain() { String query =