Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, Query
this.config = config;
this.querySizeLimit = querySizeLimit;
this.queryType = queryType;
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY, querySizeLimit);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
this.functionProperties = new FunctionProperties(QueryType.PPL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
*/
public interface Scannable {

public Enumerable<@Nullable Object> scan();
public Enumerable<@Nullable Object> scan(Integer querySizeLimit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import java.time.Instant;
import java.util.Properties;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
Expand Down Expand Up @@ -113,15 +115,16 @@ public static RelBuilder create(
new OpenSearchRelBuilder(config.getContext(), cluster, relOptSchema));
}

public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFactory) {
public static Connection connect(
FrameworkConfig config, JavaTypeFactory typeFactory, Integer querySizeLimit) {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
try {
return new OpenSearchDriver().connect("jdbc:calcite:", info, null, typeFactory);
return new OpenSearchDriver(querySizeLimit).connect("jdbc:calcite:", info, null, typeFactory);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we append Limit operator on each query instead of OpenSearchDriver constructor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a workable options as well, I put it in the alternative solution. #3879 (comment).

It has side effect of changing the original plan if the append limit operator can not be push down.

OPTION1: Push down QUERY_SIZE_LIMIT to the final single scan

  • PROS: Won't change the plan, and the optimization process is efficient and straight forward
  • CONS: Only improve on the restricted case of the single scan

OPTION2: Append LIMIT operator on the original plan

  • PROS: Has improvement on more cases than single scan. e.g Project-Scan since it has SortProjectTransposeRule to swap Limit before the Project.
  • CONS: Will change the final plan if Limit operator cannot be push down

Which option do we prefer? @penghuo @LantaoJin

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we implement eval push down in the future, OPTION2 won't have its PROS since we can support pushing down all kinds of Project and left only a single Scan

Copy link
Member

@LantaoJin LantaoJin Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 is more like a specific optimization for the setting query_size. I prefer to option 2 with a new LogicalSort such LogicalQueryLimit. Anyway, I am ok to both Option 1 and 2.

Copy link
Collaborator

@penghuo penghuo Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on option2,

CONS: Will change the final plan if Limit operator cannot be push down

Could you elaborate on this? Are you suggesting that adding a LIMIT clause would change the result of the EXPLAIN plan? If that’s the concern, I’m okay with it. We explicitly enforce a querySizeLimit for every query anyway.

Latest impl is better, scanWithLimit is clean to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that adding a LIMIT clause would change the result of the EXPLAIN plan?

Yes.

Changing the plan will have suspicion of changing customers' intension although we do have restriction on the final results.

And I was wondering why Hive or Spark doesn't do similar optimization of appending limit operator, it's said (by LLM) that the current implementation of counting row count on the final iteration has other advantages including:

  1. Plan reuse for cases like pagination. Although we don't have such feature for PPL, but maybe needed in the future.
  2. Better memory management. It currently doesn't applies for us since the whole process happens in one coordinator. But it will make sense once we change to distribution execution.
  3. Keep the plan semantically equivalent to user's SQL.

So for long term consideration, keeping the plan unchanged makes more sense and is a standard practice.

} catch (SQLException e) {
throw new RuntimeException(e);
}
Expand All @@ -147,13 +150,16 @@ private static <R> R withPrepare(
}
final CalciteServerStatement statement =
connection.createStatement().unwrap(CalciteServerStatement.class);
return new OpenSearchPrepareImpl().perform(statement, config, typeFactory, action);
// QUERY_SIZE_LIMIT only takes effect in execution, not in planning.
return new OpenSearchPrepareImpl(null).perform(statement, config, typeFactory, action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@RequiredArgsConstructor
public static class OpenSearchDriver extends Driver {
private final Integer querySizeLimit;

public Connection connect(
String url, Properties info, CalciteSchema rootSchema, JavaTypeFactory typeFactory)
Expand All @@ -171,7 +177,7 @@ public Connection connect(

@Override
protected Function0<CalcitePrepare> createPrepareFactory() {
return OpenSearchPrepareImpl::new;
return () -> new OpenSearchPrepareImpl(querySizeLimit);
}
}

Expand Down Expand Up @@ -208,7 +214,10 @@ public AggCall avg(boolean distinct, String alias, RexNode operand) {
public static final SqlAggFunction VAR_SAMP_NULLABLE =
new NullableSqlAvgAggFunction(SqlKind.VAR_SAMP);

@RequiredArgsConstructor
public static class OpenSearchPrepareImpl extends CalcitePrepareImpl {
private final Integer QUERY_SIZE_LIMIT;

/**
* Similar to {@link CalcitePrepareImpl#perform(CalciteServerStatement, FrameworkConfig,
* Frameworks.BasePrepareAction)}, but with a custom typeFactory.
Expand Down Expand Up @@ -263,7 +272,8 @@ protected CalcitePrepareImpl.CalcitePreparingStmt getPreparingStmt(
prefer,
createCluster(planner, new RexBuilder(typeFactory)),
resultConvention,
createConvertletTable());
createConvertletTable(),
QUERY_SIZE_LIMIT);
}
}

Expand All @@ -273,17 +283,19 @@ protected CalcitePrepareImpl.CalcitePreparingStmt getPreparingStmt(
*/
public static class OpenSearchCalcitePreparingStmt
extends CalcitePrepareImpl.CalcitePreparingStmt {
private final Integer QUERY_SIZE_LIMIT;

public OpenSearchCalcitePreparingStmt(
CalcitePrepareImpl prepare,
CalcitePrepare.Context context,
CatalogReader catalogReader,
RelDataTypeFactory typeFactory,
CalciteSchema schema,
EnumerableRel.Prefer prefer,
Prefer prefer,
RelOptCluster cluster,
Convention resultConvention,
SqlRexConvertletTable convertletTable) {
SqlRexConvertletTable convertletTable,
Integer querySizeLimit) {
super(
prepare,
context,
Expand All @@ -294,6 +306,7 @@ public OpenSearchCalcitePreparingStmt(
cluster,
resultConvention,
convertletTable);
this.QUERY_SIZE_LIMIT = querySizeLimit;
}

@Override
Expand All @@ -302,7 +315,7 @@ protected PreparedResult implement(RelRoot root) {
RelDataType resultType = root.rel.getRowType();
boolean isDml = root.kind.belongsTo(SqlKind.DML);
if (root.rel instanceof Scannable scannable) {
final Bindable bindable = dataContext -> scannable.scan();
final Bindable bindable = dataContext -> scannable.scan(QUERY_SIZE_LIMIT);

return new PreparedResultImpl(
resultType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ public void executeWithCalcite(
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(),
settings.getSettingValue(Key.QUERY_SIZE_LIMIT),
queryType);
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
RelNode relNode = analyze(plan, context);
RelNode optimized = optimize(relNode);
RelNode calcitePlan = convertToCalcitePlan(optimized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setUpContext() {
when(relBuilder.getRexBuilder()).thenReturn(rexBuilder);
when(rexBuilder.getTypeFactory()).thenReturn(TYPE_FACTORY);
mockedStatic = Mockito.mockStatic(CalciteToolsHelper.class);
mockedStatic.when(() -> CalciteToolsHelper.connect(any(), any())).thenReturn(connection);
mockedStatic.when(() -> CalciteToolsHelper.connect(any(), any(), any())).thenReturn(connection);

mockedStatic.when(() -> CalciteToolsHelper.create(any(), any(), any())).thenReturn(relBuilder);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalProject(age=[$8])\n LogicalSort(offset=[2], fetch=[10])\n LogicalSort(offset=[1], fetch=[10])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->10, LIMIT->10, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":3,\"size\":8,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=8, pageSize=null, startFrom=3)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->10, PROJECT->[age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":3,\"size\":8,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=8, pageSize=null, startFrom=3)])\n"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalProject(age=[$8])\n LogicalSort(fetch=[10])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public double estimateRowCount(RelMetadataQuery mq) {
case PROJECT, SORT -> rowCount;
case FILTER -> NumberUtil.multiply(
rowCount, RelMdUtil.guessSelectivity((RexNode) action.digest));
case LIMIT -> (Integer) action.digest;
case LIMIT -> Math.min(rowCount, (Integer) action.digest);
}
* estimateRowCountFactor,
(a, b) -> null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,16 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
* or SearchAfter recorded during previous search.
*/
@Override
public Enumerable<@Nullable Object> scan() {
public Enumerable<@Nullable Object> scan(Integer querySizeLimit) {
return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder();
pushDownContext.forEach(action -> action.apply(requestBuilder));
// For the simple plan with only scan, try to push down querySizeLimit to avoid PIT search
if (querySizeLimit != null && querySizeLimit > 0 && !pushDownContext.isAggregatePushed()) {
requestBuilder.pushDownLimit(querySizeLimit, 0);
}
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
Expand All @@ -104,6 +108,10 @@ public Enumerator<Object> enumerator() {
};
}

public Enumerable<@Nullable Object> scan() {
return scan(null);
}

private List<String> getFieldPath() {
return getRowType().getFieldNames().stream()
.map(f -> osIndex.getAliasMapping().getOrDefault(f, f))
Expand Down
Loading