diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/Scannable.java b/core/src/main/java/org/opensearch/sql/calcite/plan/Scannable.java index 2ec341f2eaa..aa759ba3aab 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/Scannable.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/Scannable.java @@ -16,5 +16,5 @@ */ public interface Scannable { - public Enumerable<@Nullable Object> scan(); + public Enumerable<@Nullable Object> scanWithLimit(); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index 08e680238f4..e055489bcd5 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -302,7 +302,7 @@ protected PreparedResult implement(RelRoot root) { RelDataType resultType = root.rel.getRowType(); boolean isDml = root.kind.belongsTo(SqlKind.DML); if (root.rel instanceof Scannable) { - final Bindable bindable = dataContext -> ((Scannable) root.rel).scan(); + final Bindable bindable = dataContext -> ((Scannable) root.rel).scanWithLimit(); return new PreparedResultImpl( resultType, diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index ef8876a9275..d36267595b5 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -98,9 +98,7 @@ public void executeWithCalcite( () -> { CalcitePlanContext context = CalcitePlanContext.create( - buildFrameworkConfig(), - settings.getSettingValue(Key.QUERY_SIZE_LIMIT), - queryType); + buildFrameworkConfig(), getQuerySizeLimit(), queryType); RelNode relNode = analyze(plan, context); RelNode optimized = optimize(relNode); RelNode calcitePlan = convertToCalcitePlan(optimized); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_10from1_10from2_push.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_10from1_10from2_push.json index 827a22c74c9..4e8166dd6d2 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_10from1_10from2_push.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_10from1_10from2_push.json @@ -1,6 +1,6 @@ { "calcite": { "logical": "LogicalProject(age=[$8])\n LogicalSort(offset=[2], fetch=[10])\n LogicalSort(offset=[1], fetch=[10])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->10, LIMIT->10, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":3,\"size\":8,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=8, pageSize=null, startFrom=3)])\n" + "physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->10, PROJECT->[age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":3,\"size\":8,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=8, pageSize=null, startFrom=3)])\n" } -} \ No newline at end of file +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_5_10_push.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_5_10_push.json index 7c0ea4aeec1..05a4b3815d2 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_5_10_push.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_5_10_push.json @@ -1,6 +1,6 @@ { "calcite": { "logical": "LogicalProject(age=[$8])\n LogicalSort(fetch=[10])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", - "physical": "EnumerableLimit(fetch=[10])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n" + "physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n" } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index 2430f6bbee6..55d8a0a6a4d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -37,6 +37,7 @@ import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; @@ -85,6 +86,10 @@ public RelWriter explainTerms(RelWriter pw) { .itemIf("PushDownContext", explainString, !pushDownContext.isEmpty()); } + protected Integer getQuerySizeLimit() { + return osIndex.getSettings().getSettingValue(Key.QUERY_SIZE_LIMIT); + } + @Override public double estimateRowCount(RelMetadataQuery mq) { /* @@ -114,7 +119,7 @@ public double estimateRowCount(RelMetadataQuery mq) { rowCount, RelMdUtil.guessSelectivity((RexNode) action.digest)); break; case LIMIT: - estimated = ((Integer) action.digest).doubleValue(); + estimated = Math.min(rowCount, (Integer) action.digest); break; default: throw new IllegalStateException("Unexpected value: " + action.type); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java index 1d9161d6153..667a806989a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java @@ -98,18 +98,29 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); } + @Override + public Enumerable<@Nullable Object> scanWithLimit() { + return executeScan(getQuerySizeLimit()); + } + + public Enumerable<@Nullable Object> scan() { + return executeScan(null); + } + /** * This Enumerator may be iterated for multiple times, so we need to create opensearch request for * each time to avoid reusing source builder. That's because the source builder has stats like PIT * or SearchAfter recorded during previous search. */ - @Override - public Enumerable<@Nullable Object> scan() { + private Enumerable<@Nullable Object> executeScan(Integer querySizeLimit) { return new AbstractEnumerable<>() { @Override public Enumerator enumerator() { OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder(); pushDownContext.forEach(action -> action.apply(requestBuilder)); + if (querySizeLimit != null && querySizeLimit > 0 && !pushDownContext.isAggregatePushed()) { + requestBuilder.pushDownLimit(querySizeLimit, 0); + } return new OpenSearchIndexEnumerator( osIndex.getClient(), getFieldPath(),