From 26b01a26155c8ac988d5641299a3ba1baf1826f0 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Tue, 2 Dec 2025 11:46:42 -0800 Subject: [PATCH 1/2] Support timeouts for Calcite queries (#4857) --- .../sql/common/setting/Settings.java | 1 + docs/user/ppl/admin/settings.rst | 36 ++++++++ .../executor/OpenSearchQueryManager.java | 55 +++++++++++- .../planner/rules/AggregateIndexScanRule.java | 5 +- .../planner/rules/DedupPushdownRule.java | 6 +- .../ExpandCollationOnProjectExprRule.java | 5 +- .../planner/rules/FilterIndexScanRule.java | 5 +- .../planner/rules/InterruptibleRelRule.java | 83 +++++++++++++++++++ .../planner/rules/LimitIndexScanRule.java | 5 +- .../planner/rules/ProjectIndexScanRule.java | 5 +- .../planner/rules/RareTopPushdownRule.java | 5 +- .../rules/RelevanceFunctionPushdownRule.java | 6 +- .../rules/SortAggregateMeasureRule.java | 6 +- .../planner/rules/SortExprIndexScanRule.java | 8 +- .../planner/rules/SortIndexScanRule.java | 5 +- .../rules/SortProjectExprTransposeRule.java | 6 +- .../setting/OpenSearchSettings.java | 14 ++++ .../storage/scan/OpenSearchIndexScan.java | 11 +++ .../executor/OpenSearchQueryManagerTest.java | 13 ++- .../plugin/config/OpenSearchPluginModule.java | 4 +- 20 files changed, 239 insertions(+), 45 deletions(-) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/InterruptibleRelRule.java diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 07dad582c15..c1325b67da8 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -27,6 +27,7 @@ public enum Key { /** PPL Settings. */ PPL_ENABLED("plugins.ppl.enabled"), + PPL_QUERY_TIMEOUT("plugins.ppl.query.timeout"), PATTERN_METHOD("plugins.ppl.pattern.method"), PATTERN_MODE("plugins.ppl.pattern.mode"), PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"), diff --git a/docs/user/ppl/admin/settings.rst b/docs/user/ppl/admin/settings.rst index d99cdc6c2d0..ef9eba207fa 100644 --- a/docs/user/ppl/admin/settings.rst +++ b/docs/user/ppl/admin/settings.rst @@ -73,6 +73,42 @@ PPL query:: "status": 400 } +plugins.ppl.query.timeout +========================= + +Description +----------- + +This setting controls the maximum execution time for PPL queries. When a query exceeds this timeout, it will be interrupted and return a timeout error. + +1. The default value is 300s (5 minutes). +2. This setting is node scope. +3. This setting can be updated dynamically. + +Example +------- + +You can configure the query timeout: + +PPL query:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"transient" : {"plugins.ppl.query.timeout" : "60s"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "ppl": { + "query": { + "timeout": "60s" + } + } + } + } + } + plugins.query.memory_limit ========================== diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java index dbe91dc3980..6752fec9190 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java @@ -10,32 +10,79 @@ import java.util.Map; import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; +<<<<<<< HEAD import org.opensearch.client.node.NodeClient; +======= +import org.opensearch.OpenSearchTimeoutException; +>>>>>>> 679d8ca64 (Support timeouts for Calcite queries (#4857)) import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.QueryId; import org.opensearch.sql.executor.QueryManager; import org.opensearch.sql.executor.execution.AbstractPlan; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; /** QueryManager implemented in OpenSearch cluster. */ @RequiredArgsConstructor public class OpenSearchQueryManager implements QueryManager { + private static final Logger LOG = LogManager.getLogger(OpenSearchQueryManager.class); + private final NodeClient nodeClient; private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - + private final Settings settings; @Override public QueryId submit(AbstractPlan queryPlan) { - schedule(nodeClient, () -> queryPlan.execute()); + TimeValue timeout = settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT); + schedule(nodeClient, queryPlan::execute, timeout); return queryPlan.getQueryId(); } - private void schedule(NodeClient client, Runnable task) { + private void schedule(NodeClient client, Runnable task, TimeValue timeout) { ThreadPool threadPool = client.threadPool(); - threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); + + Runnable wrappedTask = + withCurrentContext( + () -> { + final Thread executionThread = Thread.currentThread(); + + Scheduler.ScheduledCancellable timeoutTask = + threadPool.schedule( + () -> { + LOG.warn( + "Query execution timed out after {}. Interrupting execution thread.", + timeout); + executionThread.interrupt(); + }, + timeout, + ThreadPool.Names.GENERIC); + + try { + task.run(); + timeoutTask.cancel(); + // Clear any leftover thread interrupts to keep the thread pool clean + Thread.interrupted(); + } catch (Exception e) { + timeoutTask.cancel(); + + // Special-case handling of timeout-related interruptions + if (Thread.interrupted() || e.getCause() instanceof InterruptedException) { + LOG.error("Query was interrupted due to timeout after {}", timeout); + throw new OpenSearchTimeoutException( + "Query execution timed out after " + timeout); + } + + throw e; + } + }); + + threadPool.schedule(wrappedTask, new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } private Runnable withCurrentContext(final Runnable task) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java index 6b98cc566a8..4c5e024a0c1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java @@ -14,7 +14,6 @@ import java.util.stream.Collectors; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -41,7 +40,7 @@ /** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */ @Value.Enclosing -public class AggregateIndexScanRule extends RelRule { +public class AggregateIndexScanRule extends InterruptibleRelRule { /** Creates a AggregateIndexScanRule. */ protected AggregateIndexScanRule(Config config) { @@ -49,7 +48,7 @@ protected AggregateIndexScanRule(Config config) { } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { if (call.rels.length == 5) { final LogicalAggregate aggregate = call.rel(0); final LogicalProject topProject = call.rel(1); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java index 04ceffa7c84..e10680fd5b1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java @@ -11,7 +11,6 @@ import java.util.stream.Collectors; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; @@ -33,7 +32,7 @@ import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @Value.Enclosing -public class DedupPushdownRule extends RelRule { +public class DedupPushdownRule extends InterruptibleRelRule { private static final Logger LOG = LogManager.getLogger(); protected DedupPushdownRule(Config config) { @@ -41,8 +40,9 @@ protected DedupPushdownRule(Config config) { } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final LogicalProject finalProject = call.rel(0); + // TODO Used when number of duplication is more than 1 final LogicalFilter numOfDedupFilter = call.rel(1); final LogicalProject projectWithWindow = call.rel(2); if (call.rels.length == 5) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java index 2034eb1c6d8..0a8b3ae5f33 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java @@ -11,7 +11,6 @@ import java.util.function.Predicate; import org.apache.calcite.adapter.enumerable.EnumerableProject; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.volcano.AbstractConverter; @@ -44,14 +43,14 @@ */ @Value.Enclosing public class ExpandCollationOnProjectExprRule - extends RelRule { + extends InterruptibleRelRule { protected ExpandCollationOnProjectExprRule(Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final AbstractConverter converter = call.rel(0); final Project project = call.rel(1); final RelTraitSet toTraits = converter.getTraitSet(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java index b0c4f55aa3d..b35c74ac829 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java @@ -7,7 +7,6 @@ import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.logical.LogicalFilter; @@ -18,7 +17,7 @@ /** Planner rule that push a {@link LogicalFilter} down to {@link CalciteLogicalIndexScan} */ @Value.Enclosing -public class FilterIndexScanRule extends RelRule { +public class FilterIndexScanRule extends InterruptibleRelRule { /** Creates a FilterIndexScanRule. */ protected FilterIndexScanRule(Config config) { @@ -26,7 +25,7 @@ protected FilterIndexScanRule(Config config) { } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { if (call.rels.length == 2) { // the ordinary variant final LogicalFilter filter = call.rel(0); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/InterruptibleRelRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/InterruptibleRelRule.java new file mode 100644 index 00000000000..59e94a4757c --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/InterruptibleRelRule.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.rules; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; + +/** + * Base class for OpenSearch planner rules that automatically checks for thread interruption during + * query planning. This ensures that long-running planning operations can be interrupted when a + * query timeout occurs. + * + *

All OpenSearch planner rules should extend this class instead of extending {@link RelRule} + * directly. This provides automatic timeout support without requiring manual interruption checks in + * each rule. + * + *

Example usage: + * + *

{@code
+ * public class MyCustomRule extends InterruptibleRelRule {
+ *   protected MyCustomRule(Config config) {
+ *     super(config);
+ *   }
+ *
+ *   @Override
+ *   protected void onMatchImpl(RelOptRuleCall call) {
+ *     // Rule implementation - interruption is checked automatically
+ *     // before this method is called
+ *   }
+ * }
+ * }
+ * + * @param the configuration type for this rule + */ +public abstract class InterruptibleRelRule extends RelRule { + + /** + * Constructs an InterruptibleRelRule with the given configuration. + * + * @param config the rule configuration + */ + protected InterruptibleRelRule(C config) { + super(config); + } + + /** + * Called when the rule matches. This method checks for thread interruption before delegating to + * the implementation-specific {@link #onMatchImpl(RelOptRuleCall)} method. + * + *

Do not override this method in subclasses. Instead, override {@link + * #onMatchImpl(RelOptRuleCall)}. + * + * @param call the rule call context + * @throws RuntimeException wrapping {@link InterruptedException} if the thread has been + * interrupted + */ + @Override + public final void onMatch(RelOptRuleCall call) { + if (Thread.currentThread().isInterrupted()) { + throw new OpenSearchTimeoutException( + new InterruptedException( + "Query planning interrupted in rule: " + getClass().getSimpleName())); + } + + onMatchImpl(call); + } + + /** + * Implementation-specific match handler. Subclasses must implement this method instead of + * overriding {@link #onMatch(RelOptRuleCall)}. + * + *

This method is called after an automatic interruption check. If the thread has been + * interrupted (due to a timeout), this method will not be called. + * + * @param call the rule call context + */ + protected abstract void onMatchImpl(RelOptRuleCall call); +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java index ce99431fa8b..e9f7b725852 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java @@ -7,7 +7,6 @@ import java.util.Objects; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rex.RexLiteral; @@ -22,14 +21,14 @@ * down to {@link CalciteLogicalIndexScan} */ @Value.Enclosing -public class LimitIndexScanRule extends RelRule { +public class LimitIndexScanRule extends InterruptibleRelRule { protected LimitIndexScanRule(Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final LogicalSort sort = call.rel(0); final CalciteLogicalIndexScan scan = call.rel(1); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java index 629869be547..70f467ee8d0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java @@ -12,7 +12,6 @@ import java.util.Objects; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; @@ -27,7 +26,7 @@ /** Planner rule that push a {@link LogicalProject} down to {@link CalciteLogicalIndexScan} */ @Value.Enclosing -public class ProjectIndexScanRule extends RelRule { +public class ProjectIndexScanRule extends InterruptibleRelRule { /** Creates a ProjectIndexScanRule. */ protected ProjectIndexScanRule(Config config) { @@ -35,7 +34,7 @@ protected ProjectIndexScanRule(Config config) { } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { if (call.rels.length == 2) { // the ordinary variant final LogicalProject project = call.rel(0); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java index 649ce9465bc..06c8dd7f299 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RareTopPushdownRule.java @@ -10,7 +10,6 @@ import java.util.stream.Collectors; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rex.RexCall; @@ -26,14 +25,14 @@ import org.opensearch.sql.opensearch.storage.scan.context.RareTopDigest; @Value.Enclosing -public class RareTopPushdownRule extends RelRule { +public class RareTopPushdownRule extends InterruptibleRelRule { protected RareTopPushdownRule(Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final LogicalFilter filter = call.rel(0); final LogicalProject project = call.rel(1); final CalciteLogicalIndexScan scan = call.rel(2); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java index 6ec968ebc6d..9c96ed0c103 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/RelevanceFunctionPushdownRule.java @@ -9,7 +9,6 @@ import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.SINGLE_FIELD_RELEVANCE_FUNCTION_SET; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.AbstractRelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.logical.LogicalFilter; @@ -27,7 +26,8 @@ * relevance functions are always executed by OpenSearch for optimal performance and functionality. */ @Value.Enclosing -public class RelevanceFunctionPushdownRule extends RelRule { +public class RelevanceFunctionPushdownRule + extends InterruptibleRelRule { /** Creates an RelevanceFunctionPushdownRule. */ protected RelevanceFunctionPushdownRule(Config config) { @@ -35,7 +35,7 @@ protected RelevanceFunctionPushdownRule(Config config) { } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { if (call.rels.length == 2) { final LogicalFilter filter = call.rel(0); final CalciteLogicalIndexScan scan = call.rel(1); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java index 62587a2d430..db96f4550fe 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortAggregateMeasureRule.java @@ -7,7 +7,6 @@ import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.logical.LogicalSort; import org.immutables.value.Value; @@ -17,14 +16,15 @@ import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @Value.Enclosing -public class SortAggregateMeasureRule extends RelRule { +public class SortAggregateMeasureRule + extends InterruptibleRelRule { protected SortAggregateMeasureRule(Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final LogicalSort sort = call.rel(0); final CalciteLogicalIndexScan scan = call.rel(1); CalciteLogicalIndexScan newScan = scan.pushDownSortAggregateMeasure(sort); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortExprIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortExprIndexScanRule.java index 557eb3ce46e..aa2f8289a93 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortExprIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortExprIndexScanRule.java @@ -12,7 +12,6 @@ import java.util.Optional; import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.core.Project; @@ -27,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.immutables.value.Value; +import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; @@ -39,14 +39,14 @@ * the OpenSearch level for better performance. */ @Value.Enclosing -public class SortExprIndexScanRule extends RelRule { +public class SortExprIndexScanRule extends InterruptibleRelRule { protected SortExprIndexScanRule(SortExprIndexScanRule.Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final LogicalSort sort = call.rel(0); final LogicalProject project = call.rel(1); final CalciteLogicalIndexScan scan = call.rel(2); @@ -232,7 +232,7 @@ private boolean isSupportedSortScriptType(SqlTypeName sqlTypeName) { /** Rule configuration. */ @Value.Immutable - public interface Config extends RelRule.Config { + public interface Config extends OpenSearchRuleConfig { SortExprIndexScanRule.Config DEFAULT = ImmutableSortExprIndexScanRule.Config.builder() .build() diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java index 86a039cc145..44f98b315bd 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortIndexScanRule.java @@ -7,7 +7,6 @@ import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.core.Sort; import org.immutables.value.Value; import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; @@ -15,14 +14,14 @@ import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; @Value.Enclosing -public class SortIndexScanRule extends RelRule { +public class SortIndexScanRule extends InterruptibleRelRule { protected SortIndexScanRule(Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final Sort sort = call.rel(0); final AbstractCalciteIndexScan scan = call.rel(1); if (sort.getConvention() != scan.getConvention()) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java index fe0fae8e64a..a40ca3877bc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/SortProjectExprTransposeRule.java @@ -12,7 +12,6 @@ import java.util.Optional; import java.util.function.Predicate; import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelCollations; @@ -37,14 +36,15 @@ * push down sort expression script into scan. */ @Value.Enclosing -public class SortProjectExprTransposeRule extends RelRule { +public class SortProjectExprTransposeRule + extends InterruptibleRelRule { protected SortProjectExprTransposeRule(Config config) { super(config); } @Override - public void onMatch(RelOptRuleCall call) { + protected void onMatchImpl(RelOptRuleCall call) { final Sort sort = call.rel(0); final Project project = call.rel(1); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 2d113dbbedd..a8e7c6b3b49 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -86,6 +86,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting PPL_QUERY_TIMEOUT_SETTING = + Setting.positiveTimeSetting( + Key.PPL_QUERY_TIMEOUT.getKeyValue(), + TimeValue.timeValueSeconds(300), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting PPL_SYNTAX_LEGACY_PREFERRED_SETTING = Setting.boolSetting( Key.PPL_SYNTAX_LEGACY_PREFERRED.getKeyValue(), @@ -395,6 +402,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.PPL_ENABLED, PPL_ENABLED_SETTING, new Updater(Key.PPL_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.PPL_QUERY_TIMEOUT, + PPL_QUERY_TIMEOUT_SETTING, + new Updater(Key.PPL_QUERY_TIMEOUT)); register( settingBuilder, clusterSettings, @@ -671,6 +684,7 @@ public static List> pluginSettings() { .add(SQL_DELETE_ENABLED_SETTING) .add(SQL_PAGINATION_API_SEARCH_AFTER_SETTING) .add(PPL_ENABLED_SETTING) + .add(PPL_QUERY_TIMEOUT_SETTING) .add(PPL_SYNTAX_LEGACY_PREFERRED_SETTING) .add(CALCITE_ENGINE_ENABLED_SETTING) .add(CALCITE_FALLBACK_ALLOWED_SETTING) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index 8a61ba48066..9d3e4001c6a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -13,6 +13,7 @@ import lombok.EqualsAndHashCode; import lombok.ToString; import org.jetbrains.annotations.TestOnly; +import org.opensearch.OpenSearchTimeoutException; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.sql.common.setting.Settings; @@ -71,6 +72,11 @@ public void open() { @Override public boolean hasNext() { + // Check for thread interruption to support query timeout + if (Thread.currentThread().isInterrupted()) { + throw new OpenSearchTimeoutException(new InterruptedException("Query execution interrupted")); + } + // For pagination and limit, we need to limit the return rows count to pageSize or limit size if (queryCount >= maxResponseSize) { return false; @@ -84,6 +90,11 @@ public boolean hasNext() { @Override public ExprValue next() { + // Check for thread interruption to support query timeout + if (Thread.currentThread().isInterrupted()) { + throw new OpenSearchTimeoutException(new InterruptedException("Query execution interrupted")); + } + queryCount++; return iterator.next(); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java index fb490af29c2..205a8349375 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManagerTest.java @@ -20,14 +20,17 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryId; import org.opensearch.sql.executor.QueryService; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.executor.execution.AbstractPlan; import org.opensearch.sql.executor.execution.QueryPlan; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @ExtendWith(MockitoExtension.class) @@ -47,7 +50,12 @@ class OpenSearchQueryManagerTest { public void submitQuery() { NodeClient nodeClient = mock(NodeClient.class); ThreadPool threadPool = mock(ThreadPool.class); + Settings settings = mock(Settings.class); + Scheduler.ScheduledCancellable mockScheduledTask = mock(Scheduler.ScheduledCancellable.class); + when(nodeClient.threadPool()).thenReturn(threadPool); + when(settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT)) + .thenReturn(TimeValue.timeValueSeconds(60)); AtomicBoolean isRun = new AtomicBoolean(false); AbstractPlan queryPlan = @@ -58,15 +66,16 @@ public void execute() { } }; + // Mock the schedule method to run tasks immediately and return a mock ScheduledCancellable doAnswer( invocation -> { Runnable task = invocation.getArgument(0); task.run(); - return null; + return mockScheduledTask; }) .when(threadPool) .schedule(any(), any(), any()); - new OpenSearchQueryManager(nodeClient).submit(queryPlan); + new OpenSearchQueryManager(nodeClient, settings).submit(queryPlan); assertTrue(isRun.get()); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java index 432ac8499aa..c9a595df376 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java @@ -80,8 +80,8 @@ public PlanSerializer planSerializer(StorageEngine storageEngine) { @Provides @Singleton - public QueryManager queryManager(NodeClient nodeClient) { - return new OpenSearchQueryManager(nodeClient); + public QueryManager queryManager(NodeClient nodeClient, Settings settings) { + return new OpenSearchQueryManager(nodeClient, settings); } @Provides From 6048a03045393055a2526e72b49155dad8df3aa5 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Thu, 4 Dec 2025 17:42:37 +0000 Subject: [PATCH 2/2] Fix borked merge Signed-off-by: Simeon Widdis --- .../sql/opensearch/executor/OpenSearchQueryManager.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java index 6752fec9190..fa4a8826adc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java @@ -13,11 +13,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; -<<<<<<< HEAD import org.opensearch.client.node.NodeClient; -======= import org.opensearch.OpenSearchTimeoutException; ->>>>>>> 679d8ca64 (Support timeouts for Calcite queries (#4857)) import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.QueryId;