Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,6 @@ public Node visitSearch(Search node, FieldResolutionContext context) {
return node;
}

@Override
public Node visitAppendPipe(AppendPipe node, FieldResolutionContext context) {
visitChildren(node, context);
return node;
}

@Override
public Node visitRegex(Regex node, FieldResolutionContext context) {
Set<String> regexFields = extractFieldsFromExpression(node.getField());
Expand Down Expand Up @@ -507,8 +501,10 @@ public Node visitFillNull(FillNull node, FieldResolutionContext context) {

@Override
public Node visitAppendCol(AppendCol node, FieldResolutionContext context) {
throw new IllegalArgumentException(
"AppendCol command cannot be used together with spath command");
// dispatch requirements to subsearch and main
acceptAndVerifyNodeVisited(node.getSubSearch(), context);
visitChildren(node, context);
return node;
}

@Override
Expand All @@ -520,9 +516,10 @@ public Node visitAppend(Append node, FieldResolutionContext context) {
}

@Override
public Node visitMultisearch(Multisearch node, FieldResolutionContext context) {
throw new IllegalArgumentException(
"Multisearch command cannot be used together with spath command");
public Node visitAppendPipe(AppendPipe node, FieldResolutionContext context) {
acceptAndVerifyNodeVisited(node.getSubQuery(), context);
visitChildren(node, context);
return node;
}

@Override
Expand All @@ -532,7 +529,16 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) {

@Override
public Node visitValues(Values node, FieldResolutionContext context) {
throw new IllegalArgumentException("Values command cannot be used together with spath command");
// do nothing
return node;
}

@Override
public Node visitMultisearch(Multisearch node, FieldResolutionContext context) {
// dispatch requirements to subsearches and main
node.getSubsearches().forEach(subsearch -> acceptAndVerifyNodeVisited(subsearch, context));
visitChildren(node, context);
return node;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,7 @@ public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
prunedSubSearch.accept(this, context);
subsearchNodes.add(context.relBuilder.build());
}
subsearchNodes = DynamicFieldsHelper.adjustInputsForDynamicFields(subsearchNodes, context);

// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> alignedNodes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -99,8 +100,9 @@ static void adjustJoinInputsForDynamicFields(
// build once to modify the inputs already in the stack.
RelNode right = context.relBuilder.build();
RelNode left = context.relBuilder.build();
left = adjustFieldsForDynamicFields(left, right, context);
right = adjustFieldsForDynamicFields(right, left, context);
List<RelNode> inputs = adjustInputsForDynamicFields(List.of(right, left), context);
right = inputs.get(0);
left = inputs.get(1);
context.relBuilder.push(left);
// `as(alias)` is needed since `build()` won't preserve alias
leftAlias.map(alias -> context.relBuilder.as(alias));
Expand All @@ -119,6 +121,36 @@ static RelNode adjustFieldsForDynamicFields(
return target;
}

/** Adjust inputs to align the static/dynamic fields each other */
static List<RelNode> adjustInputsForDynamicFields(
List<RelNode> inputs, CalcitePlanContext context) {
boolean requireAdjustment = inputs.stream().anyMatch(input -> hasDynamicFields(input));
if (requireAdjustment) {
List<String> requiredStaticFields = getRequiredStaticFields(inputs);
return inputs.stream()
.map(input -> adjustFieldsForDynamicFields(input, requiredStaticFields, context))
.collect(Collectors.toList());
} else {
return inputs;
}
}

static List<String> getRequiredStaticFields(List<RelNode> inputs) {
Set<String> requiredStaticFields = new HashSet<String>();
for (RelNode input : inputs) {
if (hasDynamicFields(input)) {
requiredStaticFields.addAll(getStaticFields(input));
}
}
return toSortedList(requiredStaticFields);
}

private static List<String> toSortedList(Collection<String> collection) {
ArrayList<String> result = new ArrayList<>(collection);
Collections.sort(result);
return result;
}

/**
* Project node's fields in `requiredFieldNames` as static field, and put other fields into `_MAP`
* (dynamic fields) This projection is needed when merging an input with dynamic fields and an
Expand All @@ -128,16 +160,27 @@ static RelNode adjustFieldsForDynamicFields(
static RelNode adjustFieldsForDynamicFields(
RelNode node, List<String> staticFieldNames, CalcitePlanContext context) {
context.relBuilder.push(node);
List<String> existingFields = node.getRowType().getFieldNames();
List<String> existingFields = getStaticFields(node);
List<RexNode> project = new ArrayList<>();
for (String existingField : existingFields) {
if (staticFieldNames.contains(existingField)) {
project.add(context.rexBuilder.makeInputRef(node, existingFields.indexOf(existingField)));
}
}
project.add(
context.relBuilder.alias(
getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP));
if (hasDynamicFields(node)) {
// _MAP = MAP_APPEND(_MAP, MAP(existingFields - staticFields))
RexNode existingDynamicFieldsMap = context.relBuilder.field(DYNAMIC_FIELDS_MAP);
RexNode additionalFieldsMap = getFieldsAsMap(existingFields, staticFieldNames, context);
RexNode mapAppend =
context.rexBuilder.makeCall(
BuiltinFunctionName.MAP_APPEND, existingDynamicFieldsMap, additionalFieldsMap);
project.add(context.relBuilder.alias(mapAppend, DYNAMIC_FIELDS_MAP));
} else {
// _MAP = MAP(existingFields - staticFields)
project.add(
context.relBuilder.alias(
getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP));
}
return context.relBuilder.project(project).build();
}

Expand Down
2 changes: 1 addition & 1 deletion docs/user/ppl/cmd/spath.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ For more information about path syntax, see [json_extract](../functions/json.md#
* **Limitation**: Field order in the result could be inconsistent with query without `spath` command, and the behavior might change in the future version.
* **Limitation**: Filter with subquery (`where <field> in/exists [...]`) is not supported with `spath` command.
* **Limitation**: `fillnull` command requires to specify fields when used with `spath` command.
* **Limitation**: Following commands cannot be used together with `spath` command: `appendcol`, `multisearch`, `lookup`.
* **Limitation**: Following commands cannot be used together with `spath` command: `lookup`.
* **Performance**: Filter records before `spath` command for best performance (see Example 8)

* **Internal Implementation**: The auto extraction feature uses an internal `_MAP` system column to store dynamic fields during query processing. This column is automatically expanded into individual columns in the final results and users don't need to reference it directly. For more information, see [System Columns](../general/identifiers.md#system-columns).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,100 @@ public void testAppendWithSpathInSubsearchDynamicFields() throws IOException {
rows(null, null, null, "simple", "4", sj("{'a': 1, 'b': 2, 'c': 3}")),
rows("1", "3", "2", "simple", null, sj("{'a': 1, 'b': 2, 'c': 3}")));
}

@Test
public void testAppendColWithSpathInMain() throws IOException {
JSONObject result =
executeQuery(
"source=test_json | where category='simple' | spath input=userData | appendcol [where"
+ " category='simple'] | fields a, c, *");
verifySchema(
result,
schema("a", "string"),
schema("c", "string"),
schema("category", "string"),
schema("userData", "string"),
schema("b", "string"));
verifyDataRows(
result,
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"),
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"));
}

@Test
public void testAppendColWithSpathInSubsearch() throws IOException {
JSONObject result =
executeQuery(
"source=test_json | where category='simple' | appendcol [where category='simple' |"
+ " spath input=userData] | fields a, c, *");
verifySchema(
result,
schema("a", "string"),
schema("c", "string"),
schema("category", "string"),
schema("userData", "string"),
schema("b", "string"));
verifyDataRows(
result,
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"),
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"));
}

@Test
public void testAppendColWithSpathInBothInputs() throws IOException {
JSONObject result =
executeQuery(
"source=test_json | where category='simple' | spath input=userData | appendcol [where"
+ " category='simple' | spath input=userData ] | fields a, c, *");
verifySchema(
result,
schema("a", "string"),
schema("c", "string"),
schema("b", "string"),
schema("category", "string"),
schema("userData", "string"));
verifyDataRows(
result,
rows("1", "3", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}")),
rows("1", "3", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}")));
}

@Test
public void testAppendPipeWithSpathInMain() throws IOException {
JSONObject result =
executeQuery(
"source=test_json | where category='simple' | spath input=userData | stats sum(a) as"
+ " total by b | appendpipe [stats sum(total) as total] | head 5");
verifySchema(result, schema("total", "double"), schema("b", "string"));
verifyDataRows(result, rows(2, "2"), rows(2, null));
}

@Test
public void testMultisearchWithSpath() throws IOException {
JSONObject result =
executeQuery(
"| multisearch [source=test_json | where category='simple' | spath input=userData |"
+ " head 1] [source=test_json | where category='nested' | spath input=userData] |"
+ " fields a, c, *");
verifySchema(
result,
schema("a", "string"),
schema("c", "string"),
schema("b", "string"),
schema("category", "string"),
schema("nested.d{}", "string"),
schema("nested.e", "string"),
schema("userData", "string"));
verifyDataRows(
result,
rows("1", "3", "2", "simple", null, null, sj("{'a': 1, 'b': 2, 'c': 3}")),
rows(
null,
null,
null,
"nested",
"[1, 2, 3]",
"str",
sj("{'nested': {'d': [1, 2, 3], 'e': 'str'}}")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,38 @@ public void testAppend() {
"sub", new FieldResolutionResult(Set.of("a", "c", "testCase"), "*")));
}

@Test
public void testAppendCol() {
String query =
"source=main | where testCase='simple' | eval c = 4 | "
+ "appendcol [where testCase='simple' ] | fields a, c, *";
assertMultiRelationFields(
query, Map.of("main", new FieldResolutionResult(Set.of("a", "testCase"), "*")));
}

@Test
public void testAppendpipe() {
String query =
"source=main | where testCase='simple' | stats sum(a) as sum_a by b | "
+ "appendpipe [stats sum(sum_a) as total] | head 5";
assertMultiRelationFields(
query, Map.of("main", new FieldResolutionResult(Set.of("a", "b", "testCase"))));
}

@Test
public void testMultisearch() {
String query =
"| multisearch [source=main | where testCase='simple'] [source=sub | where"
+ " testCase='simple'] | fields a, c, *";
assertMultiRelationFields(
query,
Map.of(
"main",
new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"),
"sub",
new FieldResolutionResult(Set.of("a", "c", "testCase"), "*")));
}

@Test
public void testAppendWithSpathInMain() {
String query =
Expand Down
Loading