Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1262,15 +1262,6 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<UnresolvedPlan> children = node.getChildren();
children.forEach(c -> analyze(c, context));
// add join.subsearch_maxout limit to subsearch side, 0 and negative means unlimited.
if (context.sysLimit.joinSubsearchLimit() > 0) {
PlanUtils.replaceTop(
context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
context.relBuilder.peek(),
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
}
if (node.getJoinCondition().isEmpty()) {
// join-with-field-list grammar
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();
Expand Down Expand Up @@ -1327,23 +1318,25 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
.collect(Collectors.toList());
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
}
// add LogicalSystemLimit after dedup
addSysLimitForJoinSubsearch(context);
context.relBuilder.join(
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
if (!toBeRemovedFields.isEmpty()) {
context.relBuilder.projectExcept(toBeRemovedFields);
}
return context.relBuilder.peek();
}
// The join-with-criteria grammar doesn't allow empty join condition
RexNode joinCondition =
node.getJoinCondition()
.map(c -> rexVisitor.analyzeJoinCondition(c, context))
.orElse(context.relBuilder.literal(true));
if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) {
// semi and anti join only return left table outputs
context.relBuilder.join(
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
} else {
// The join-with-criteria grammar doesn't allow empty join condition
RexNode joinCondition =
node.getJoinCondition()
.map(c -> rexVisitor.analyzeJoinCondition(c, context))
.orElse(context.relBuilder.literal(true));
if (node.getJoinType() == SEMI || node.getJoinType() == ANTI) {
// semi and anti join only return left table outputs
context.relBuilder.join(
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
return context.relBuilder.peek();
}
// Join condition could contain duplicated column name, Calcite will rename the duplicated
// column name with numeric suffix, e.g. ON t1.id = t2.id, the output contains `id` and `id0`
// when a new project add to stack. To avoid `id0`, we will rename the `id0` to `alias.id`
Expand Down Expand Up @@ -1383,6 +1376,8 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {

buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
}
// add LogicalSystemLimit after dedup
addSysLimitForJoinSubsearch(context);
context.relBuilder.join(
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
JoinAndLookupUtils.renameToExpectedFields(
Expand All @@ -1391,6 +1386,18 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

private static void addSysLimitForJoinSubsearch(CalcitePlanContext context) {
// add join.subsearch_maxout limit to subsearch side, 0 and negative means unlimited.
if (context.sysLimit.joinSubsearchLimit() > 0) {
PlanUtils.replaceTop(
context.relBuilder,
LogicalSystemLimit.create(
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
context.relBuilder.peek(),
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
}
}

private List<RexNode> getRightColumnsInJoinCriteria(
RelBuilder relBuilder, RexNode joinCondition) {
int stackSize = relBuilder.size();
Expand Down Expand Up @@ -1566,7 +1573,6 @@ private static void buildDedupOrNull(
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.partitionBy(dedupeFields)
.orderBy(dedupeFields)
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_FOR_DEDUP);
context.relBuilder.projectPlus(rowNumber);
Expand Down Expand Up @@ -1608,7 +1614,6 @@ private static void buildDedupNotNull(
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.partitionBy(dedupeFields)
.orderBy(dedupeFields)
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(rowNumberAlias);
context.relBuilder.projectPlus(rowNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,11 @@ static RexNode derefMapCall(RexNode rexNode) {

/** Check if contains dedup */
static boolean containsRowNumberDedup(RelNode node) {
return node.getRowType().getFieldNames().stream().anyMatch(ROW_NUMBER_COLUMN_FOR_DEDUP::equals);
return node.getRowType().getFieldNames().stream()
.anyMatch(
name ->
name.equals(ROW_NUMBER_COLUMN_FOR_DEDUP)
|| name.equals(ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP));
}

/** Check if contains dedup for top/rare */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public void supportSearchSargPushDown_timeRange() throws IOException {
// Only for Calcite
@Test
public void testJoinWithCriteriaAndMaxOption() throws IOException {
// TODO could be optimized with https://github.com/opensearch-project/OpenSearch/issues/3725
enabledOnlyWhenPushdownIsEnabled();
String query =
"source=opensearch-sql_test_index_bank | join max=1 left=l right=r on"
+ " l.account_number=r.account_number opensearch-sql_test_index_bank";
Expand All @@ -99,8 +97,6 @@ public void testJoinWithCriteriaAndMaxOption() throws IOException {
// Only for Calcite
@Test
public void testJoinWithFieldListAndMaxOption() throws IOException {
// TODO could be optimized with https://github.com/opensearch-project/OpenSearch/issues/3725
enabledOnlyWhenPushdownIsEnabled();
String query =
"source=opensearch-sql_test_index_bank | join type=inner max=1 account_number"
+ " opensearch-sql_test_index_bank";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ public void testTopTCPFlowsByBytes() throws IOException {
public void testTopTCPFlags() throws IOException {
String query =
String.format(
"source=%s | STATS count() as Count by `event.tcp.tcp_flags` | SORT - Count, `event.tcp.tcp_flags` | HEAD 10",
"source=%s | STATS count() as Count by `event.tcp.tcp_flags` | SORT - Count,"
+ " `event.tcp.tcp_flags` | HEAD 10",
NFW_LOGS_INDEX);
JSONObject response = executeQuery(query);
verifySchema(response, schema("Count", "bigint"), schema("event.tcp.tcp_flags", "string"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.ppl.dashboard;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
Expand Down Expand Up @@ -167,7 +166,8 @@ public void testTopTalkersByBytes() throws IOException {
public void testTopTalkersByPackets() throws IOException {
String query =
String.format(
"source=%s | stats sum(packets) as Packets by srcaddr | sort - Packets, srcaddr | head 10",
"source=%s | stats sum(packets) as Packets by srcaddr | sort - Packets, srcaddr | head"
+ " 10",
VPC_FLOW_LOGS_INDEX);
JSONObject response = executeQuery(query);
verifySchema(response, schema("Packets", null, "bigint"), schema("srcaddr", null, "string"));
Expand All @@ -189,7 +189,8 @@ public void testTopTalkersByPackets() throws IOException {
public void testTopDestinationsByPackets() throws IOException {
String query =
String.format(
"source=%s | stats sum(packets) as Packets by dstaddr | sort - Packets, dstaddr | head 10",
"source=%s | stats sum(packets) as Packets by dstaddr | sort - Packets, dstaddr | head"
+ " 10",
VPC_FLOW_LOGS_INDEX);
JSONObject response = executeQuery(query);
verifySchema(response, schema("Packets", null, "bigint"), schema("dstaddr", null, "string"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public void testTopCountries() throws IOException {
public void testTopTerminatingRules() throws IOException {
String query =
String.format(
"source=%s | stats count() as Count by `terminatingRuleId` | sort - Count, `terminatingRuleId` | head 10",
"source=%s | stats count() as Count by `terminatingRuleId` | sort - Count,"
+ " `terminatingRuleId` | head 10",
WAF_LOGS_INDEX);

JSONObject response = executeQuery(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ calcite:
LogicalJoin(condition=[=($12, $19)], joinType=[left])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], lastname=[REX_EXTRACT($6, '(?<lastname>^[A-Z])', 'lastname')])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])
LogicalFilter(condition=[<=($13, 1)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _row_number_join_max_dedup_=[ROW_NUMBER() OVER (PARTITION BY $6 ORDER BY $6)])
LogicalFilter(condition=[IS NOT NULL($6)])
LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])
LogicalSystemLimit(fetch=[50000], type=[JOIN_SUBSEARCH_MAXOUT])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])
LogicalFilter(condition=[<=($13, 1)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _row_number_join_max_dedup_=[ROW_NUMBER() OVER (PARTITION BY $6)])
LogicalFilter(condition=[IS NOT NULL($6)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
physical: |
Expand All @@ -18,8 +18,4 @@ calcite:
EnumerableMergeJoin(condition=[=($0, $7)], joinType=[left])
EnumerableCalc(expr#0=[{inputs}], expr#1=['(?<lastname>^[A-Z])'], expr#2=['lastname'], expr#3=[REX_EXTRACT($t0, $t1, $t2)], $f0=[$t3])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[lastname], LIMIT->10000, SORT_EXPR->[REX_EXTRACT($0, '(?<lastname>^[A-Z])', 'lastname') ASCENDING NULLS_LAST]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["lastname"],"excludes":[]},"sort":[{"_script":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQC63sKICAib3AiOiB7CiAgICAibmFtZSI6ICJSRVhfRVhUUkFDVCIsCiAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAic3ludGF4IjogIkZVTkNUSU9OIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAiZHluYW1pY1BhcmFtIjogMCwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgIm51bGxhYmxlIjogdHJ1ZSwKICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgfQogICAgfSwKICAgIHsKICAgICAgImR5bmFtaWNQYXJhbSI6IDEsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgIH0KICAgIH0sCiAgICB7CiAgICAgICJkeW5hbWljUGFyYW0iOiAyLAogICAgICAidHlwZSI6IHsKICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9CiAgXSwKICAiY2xhc3MiOiAib3JnLm9wZW5zZWFyY2guc3FsLmV4cHJlc3Npb24uZnVuY3Rpb24uVXNlckRlZmluZWRGdW5jdGlvbkJ1aWxkZXIkMSIsCiAgInR5cGUiOiB7CiAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAicHJlY2lzaW9uIjogMjAwMAogIH0sCiAgImRldGVybWluaXN0aWMiOiB0cnVlLAogICJkeW5hbWljIjogZmFsc2UKfQ==\"}","lang":"opensearch_compounded_script","params":{"MISSING_MAX":true,"utcTimestamp": 0,"SOURCES":[0,2,2],"DIGESTS":["lastname","(?<lastname>^[A-Z])","lastname"]}},"type":"string","order":"asc"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$6], dir0=[ASC])
EnumerableCalc(expr#0..13=[{inputs}], expr#14=[1], expr#15=[<=($t13, $t14)], proj#0..12=[{exprs}], $condition=[$t15])
EnumerableWindow(window#0=[window(partition {6} order by [6] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
EnumerableCalc(expr#0..12=[{inputs}], expr#13=[IS NOT NULL($t6)], proj#0..12=[{exprs}], $condition=[$t13])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1)), LIMIT->50000, SORT->[6]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"lastname":{"terms":{"field":"lastname","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["lastname","account_number","firstname","address","birthdate","gender","city","balance","employer","state","age","email","male"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])
LogicalFilter(condition=[<=($17, 1)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $4 ORDER BY $4)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $4)])
LogicalFilter(condition=[IS NOT NULL($4)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], state=[$3])
LogicalFilter(condition=[<=($4, 1)])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], state=[$3], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $1, $3 ORDER BY $1, $3)])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], state=[$3], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $1, $3)])
LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($3))])
LogicalProject(account_number=[$0], gender=[$4], age=[$8], state=[$7])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])
LogicalFilter(condition=[<=($17, 2)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $4, $7 ORDER BY $4, $7)])
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $4, $7)])
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], state=[$3])
LogicalFilter(condition=[<=($4, 2)])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], state=[$3], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $1, $3 ORDER BY $1, $3)])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], state=[$3], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $1, $3)])
LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($3))])
LogicalProject(account_number=[$0], gender=[$4], age=[$8], state=[$7])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ calcite:
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(account_number=[$0], gender=[$1], age=[$2])
LogicalFilter(condition=[<=($3, 1)])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $1)])
LogicalProject(account_number=[$0], gender=[$1], age=[$2], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $1)])
LogicalFilter(condition=[IS NOT NULL($1)])
LogicalProject(account_number=[$0], gender=[$4], age=[$8])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Expand Down
Loading
Loading