Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
*/
public interface Scannable {

public Enumerable<@Nullable Object> scan();
public Enumerable<@Nullable Object> scanWithLimit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ protected PreparedResult implement(RelRoot root) {
RelDataType resultType = root.rel.getRowType();
boolean isDml = root.kind.belongsTo(SqlKind.DML);
if (root.rel instanceof Scannable) {
final Bindable bindable = dataContext -> ((Scannable) root.rel).scan();
final Bindable bindable = dataContext -> ((Scannable) root.rel).scanWithLimit();

return new PreparedResultImpl(
resultType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ public void executeWithCalcite(
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(),
settings.getSettingValue(Key.QUERY_SIZE_LIMIT),
queryType);
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode);
RelNode calcitePlan = convertToCalcitePlan(optimized);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalProject(age=[$8])\n LogicalSort(offset=[2], fetch=[10])\n LogicalSort(offset=[1], fetch=[10])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->10, LIMIT->10, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":3,\"size\":8,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=8, pageSize=null, startFrom=3)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->10, PROJECT->[age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":3,\"size\":8,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=8, pageSize=null, startFrom=3)])\n"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalProject(age=[$8])\n LogicalSort(fetch=[10])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.common.setting.Settings.Key;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
Expand Down Expand Up @@ -85,6 +86,10 @@ public RelWriter explainTerms(RelWriter pw) {
.itemIf("PushDownContext", explainString, !pushDownContext.isEmpty());
}

protected Integer getQuerySizeLimit() {
return osIndex.getSettings().getSettingValue(Key.QUERY_SIZE_LIMIT);
}

@Override
public double estimateRowCount(RelMetadataQuery mq) {
/*
Expand Down Expand Up @@ -114,7 +119,7 @@ public double estimateRowCount(RelMetadataQuery mq) {
rowCount, RelMdUtil.guessSelectivity((RexNode) action.digest));
break;
case LIMIT:
estimated = ((Integer) action.digest).doubleValue();
estimated = Math.min(rowCount, (Integer) action.digest);
break;
default:
throw new IllegalStateException("Unexpected value: " + action.type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,29 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan")));
}

@Override
public Enumerable<@Nullable Object> scanWithLimit() {
return executeScan(getQuerySizeLimit());
}

public Enumerable<@Nullable Object> scan() {
return executeScan(null);
}

/**
* This Enumerator may be iterated for multiple times, so we need to create opensearch request for
* each time to avoid reusing source builder. That's because the source builder has stats like PIT
* or SearchAfter recorded during previous search.
*/
@Override
public Enumerable<@Nullable Object> scan() {
private Enumerable<@Nullable Object> executeScan(Integer querySizeLimit) {
return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder();
pushDownContext.forEach(action -> action.apply(requestBuilder));
if (querySizeLimit != null && querySizeLimit > 0 && !pushDownContext.isAggregatePushed()) {
requestBuilder.pushDownLimit(querySizeLimit, 0);
}
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
Expand Down
Loading