-
Notifications
You must be signed in to change notification settings - Fork 180
Mvexpand feature #4944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Mvexpand feature #4944
Changes from all commits
8362fc2
384ba15
7f382f9
44c8124
3cad64e
8e4a2c5
474617d
d502b03
c62defe
a3799b2
d90be9f
da16288
1301e06
beb31de
627ef8f
58facf8
63cdbf7
bdc3aa1
fc8e345
c830356
e9b6f27
fa9436e
ea091d2
4d9b24d
b9d3164
26a59a4
43c806e
a07dff2
7be7473
2c0ea2c
8749289
9508874
08b56ee
3ae2c73
bed2084
5e616ff
709704c
c45fa05
4f3435e
a0b2c8c
47779e1
bf6b924
9aec421
bf87312
c9e2767
2591a6c
125cf3b
00c990f
44814ab
f9dd692
69d6a5a
2464675
0f86c52
32d3867
07509ae
34db739
602358e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.List; | ||
| import javax.annotation.Nullable; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.expression.Field; | ||
|
|
||
| /** AST node representing an {@code mvexpand <field> [limit N]} operation. */ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| @ToString | ||
| @EqualsAndHashCode(callSuper = false) | ||
| public class MvExpand extends UnresolvedPlan { | ||
|
|
||
| private UnresolvedPlan child; | ||
| @Getter private final Field field; | ||
| @Getter @Nullable private final Integer limit; | ||
|
|
||
| public MvExpand(Field field, @Nullable Integer limit) { | ||
| this.field = field; | ||
| this.limit = limit; | ||
| } | ||
|
|
||
| @Override | ||
| public MvExpand attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public List<UnresolvedPlan> getChild() { | ||
| return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitMvExpand(this, context); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,7 @@ | |
| import org.opensearch.sql.ast.expression.ParseMethod; | ||
| import org.opensearch.sql.ast.expression.PatternMethod; | ||
| import org.opensearch.sql.ast.expression.PatternMode; | ||
| import org.opensearch.sql.ast.expression.QualifiedName; | ||
| import org.opensearch.sql.ast.expression.Span; | ||
| import org.opensearch.sql.ast.expression.SpanUnit; | ||
| import org.opensearch.sql.ast.expression.UnresolvedExpression; | ||
|
|
@@ -122,6 +123,7 @@ | |
| import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; | ||
| import org.opensearch.sql.ast.tree.ML; | ||
| import org.opensearch.sql.ast.tree.Multisearch; | ||
| import org.opensearch.sql.ast.tree.MvExpand; | ||
| import org.opensearch.sql.ast.tree.Paginate; | ||
| import org.opensearch.sql.ast.tree.Parse; | ||
| import org.opensearch.sql.ast.tree.Patterns; | ||
|
|
@@ -845,7 +847,11 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) { | |
| .toList(); | ||
| context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggCall); | ||
| buildExpandRelNode( | ||
| context.relBuilder.field(node.getAlias()), node.getAlias(), node.getAlias(), context); | ||
| context.relBuilder.field(node.getAlias()), | ||
| node.getAlias(), | ||
| node.getAlias(), | ||
| null, | ||
| context); | ||
| flattenParsedPattern( | ||
| node.getAlias(), | ||
| context.relBuilder.field(node.getAlias()), | ||
|
|
@@ -3093,11 +3099,102 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { | |
| RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context); | ||
| String alias = expand.getAlias(); | ||
|
|
||
| buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, context); | ||
| buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, null, context); | ||
|
|
||
| return context.relBuilder.peek(); | ||
| } | ||
|
|
||
| /** | ||
| * MVExpand command visitor. | ||
| * | ||
| * <p>For Calcite remote planning, mvexpand shares the same expansion mechanics as {@link Expand}: | ||
| * it unnests the target multivalue field and joins back to the original relation. The additional | ||
| * mvexpand semantics (such as an optional per-document limit) are surfaced via the MVExpand AST | ||
| * node but reuse the same underlying RelBuilder pipeline as expand at this layer. | ||
| * | ||
| * @param mvExpand MVExpand command to be visited | ||
| * @param context CalcitePlanContext containing the RelBuilder and other context | ||
| * @return RelNode representing records with the expanded multi-value field | ||
| */ | ||
| @Override | ||
| public RelNode visitMvExpand(MvExpand mvExpand, CalcitePlanContext context) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does it have lots of difference with visitExpand? Should it be mostly same other than |
||
| // 1. Visit children | ||
| visitChildren(mvExpand, context); | ||
|
|
||
| RelBuilder relBuilder = context.relBuilder; | ||
| RelDataType rowType = relBuilder.peek().getRowType(); | ||
|
|
||
| Field field = mvExpand.getField(); | ||
|
|
||
| String fieldName = extractFieldName(field); | ||
|
|
||
| // 2. Lookup field | ||
| RelDataTypeField matched = rowType.getField(fieldName, false, false); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should rely on |
||
|
|
||
| // 2A. Missing field → true EMPTY relation (no schema, no rows) | ||
| if (matched == null) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we raise error when the field is not found? |
||
| // Schema must include the missing field, even if no rows returned. | ||
| List<RelDataTypeField> fields = rowType.getFieldList(); | ||
| List<RexNode> projects = new ArrayList<>(); | ||
| List<String> names = new ArrayList<>(); | ||
|
|
||
| // Keep existing fields | ||
| for (RelDataTypeField f : fields) { | ||
| projects.add(relBuilder.field(f.getIndex())); | ||
| names.add(f.getName()); | ||
| } | ||
|
|
||
| // Add NULL for missing field | ||
| projects.add(relBuilder.literal(null)); | ||
| names.add(fieldName); | ||
|
|
||
| relBuilder.project(projects, names); | ||
|
|
||
| // Now return 0 rows | ||
| relBuilder.filter(relBuilder.literal(false)); | ||
|
|
||
| return relBuilder.peek(); | ||
| } | ||
|
|
||
| // 2B. Non-array → SemanticCheckException (return immediately) | ||
| RelDataType type = matched.getType(); | ||
| SqlTypeName sqlType = type.getSqlTypeName(); | ||
|
|
||
| if (sqlType != SqlTypeName.ARRAY) { | ||
| throw new SemanticCheckException( | ||
| String.format( | ||
| "Cannot expand field '%s': expected ARRAY type but found %s", | ||
| fieldName, sqlType.getName())); | ||
| } | ||
|
|
||
| // 2C. Valid array → expand (with optional per-document limit) | ||
| int index = matched.getIndex(); | ||
| RexInputRef fieldRef = context.rexBuilder.makeInputRef(type, index); | ||
|
|
||
| Integer limit = mvExpand.getLimit(); | ||
| if (limit != null && limit <= 0) { | ||
| throw new SemanticCheckException( | ||
| String.format("mvexpand limit must be positive, but got %d", limit)); | ||
| } | ||
| buildExpandRelNode(fieldRef, fieldName, fieldName, limit, context); | ||
|
|
||
| return relBuilder.peek(); | ||
| } | ||
|
|
||
| private String extractFieldName(Field f) { | ||
| UnresolvedExpression inner = f.getField(); | ||
|
|
||
| if (inner instanceof QualifiedName) { | ||
| List<String> parts = ((QualifiedName) inner).getParts(); | ||
| if (!parts.isEmpty()) { | ||
| return String.join(".", parts); | ||
| } | ||
| } | ||
|
|
||
| // Fallback - return clean string | ||
| return inner.toString().replace("`", ""); | ||
| } | ||
|
|
||
| @Override | ||
| public RelNode visitValues(Values values, CalcitePlanContext context) { | ||
| if (values.getValues() == null || values.getValues().isEmpty()) { | ||
|
|
@@ -3342,7 +3439,11 @@ private void flattenParsedPattern( | |
| } | ||
|
|
||
| private void buildExpandRelNode( | ||
| RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) { | ||
| RexInputRef arrayFieldRex, | ||
| String arrayFieldName, | ||
| String alias, | ||
| @Nullable Integer perDocLimit, | ||
| CalcitePlanContext context) { | ||
| // 3. Capture the outer row in a CorrelationId | ||
| Holder<RexCorrelVariable> correlVariable = Holder.empty(); | ||
| context.relBuilder.variable(correlVariable::set); | ||
|
|
@@ -3357,14 +3458,17 @@ private void buildExpandRelNode( | |
| RelNode leftNode = context.relBuilder.build(); | ||
|
|
||
| // 5. Build join right node and expand the array field using uncollect | ||
| RelNode rightNode = | ||
| context | ||
| .relBuilder | ||
| // fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter | ||
| .push(LogicalValues.createOneRow(context.relBuilder.getCluster())) | ||
| .project(List.of(correlArrayFieldAccess), List.of(arrayFieldName)) | ||
| .uncollect(List.of(), false) | ||
| .build(); | ||
| context | ||
| .relBuilder | ||
| // fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter | ||
| .push(LogicalValues.createOneRow(context.relBuilder.getCluster())) | ||
| .project(List.of(correlArrayFieldAccess), List.of(arrayFieldName)) | ||
| .uncollect(List.of(), false); | ||
|
|
||
| if (perDocLimit != null) { | ||
| context.relBuilder.limit(0, perDocLimit); | ||
| } | ||
| RelNode rightNode = context.relBuilder.build(); | ||
|
|
||
| // 6. Perform a nested-loop join (correlate) between the original table and the expanded | ||
| // array field. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1108,6 +1108,14 @@ void populate() { | |
| OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.INTEGER) | ||
| .or(OperandTypes.family(SqlTypeFamily.MAP, SqlTypeFamily.ANY)), | ||
| false)); | ||
| // Allow using INTERNAL_ITEM when the element type is unknown/undefined at planning time. | ||
| // Some datasets (or Calcite's type inference) may give the element an UNDEFINED type. | ||
| // Accept a "ignore" first-argument family so INTERNAL_ITEM(elem, 'key') can still be planned | ||
| // and resolved at runtime (fallback semantics handled at execution side). - Used in MVEXPAND | ||
| registerOperator( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if missed some context. What is this used for? Please point me to comment related if any.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code defines a fallback type-checker for INTERNAL_ITEM to handle cases where the element type is undefined at planning time. It ensures the operator accepts composite types (IGNORE, CHARACTER) for resilient type validation during query execution.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Could you clarify a little more which query/test dataset you have this problem? If any previous comment related, please let me know. Thanks!
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without that code, tests fail because the INTERNAL_ITEM operator requires specific argument types ({ARRAY, INTEGER} or {STRUCT, ANY}) for type validation during query execution. When Calcite assigns an UNDEFINED type at planning time, the absence of fallback handling in registerOperator causes validation to fail, triggering ExpressionEvaluationException. With that code, the fallback type-checker accepts UNDEFINED types as valid arguments (IGNORE, CHARACTER), allowing queries to pass type validation, execute correctly, and produce expected results in the tests. Example - I commented that code and ran my IT below is the failure testMvexpandDuplicate org.opensearch.client.ResponseException: method [POST], host [http://127.0.0.1:65246], URI [/_plugins/_ppl], status line [HTTP/1.1 400 Bad Request] |
||
| INTERNAL_ITEM, | ||
| SqlStdOperatorTable.ITEM, | ||
| PPLTypeChecker.family(SqlTypeFamily.IGNORE, SqlTypeFamily.CHARACTER)); | ||
| registerOperator( | ||
| XOR, | ||
| SqlStdOperatorTable.NOT_EQUALS, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.