Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum Key {

/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),
PPL_QUERY_TIMEOUT("plugins.ppl.query.timeout"),
PATTERN_METHOD("plugins.ppl.pattern.method"),
PATTERN_MODE("plugins.ppl.pattern.mode"),
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),
Expand Down
36 changes: 36 additions & 0 deletions docs/user/ppl/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,42 @@ PPL query::
"status": 400
}

plugins.ppl.query.timeout
=========================

Description
-----------

This setting controls the maximum execution time for PPL queries. When a query exceeds this timeout, it will be interrupted and return a timeout error.

1. The default value is 300s (5 minutes).
2. This setting is node scope.
3. This setting can be updated dynamically.

Example
-------

You can configure the query timeout:

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.query.timeout" : "60s"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"ppl": {
"query": {
"timeout": "60s"
}
}
}
}
}

plugins.query.memory_limit
==========================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,76 @@

import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.opensearch.client.node.NodeClient;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryManager;
import org.opensearch.sql.executor.execution.AbstractPlan;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

/** QueryManager implemented in OpenSearch cluster. */
@RequiredArgsConstructor
public class OpenSearchQueryManager implements QueryManager {

private static final Logger LOG = LogManager.getLogger(OpenSearchQueryManager.class);

private final NodeClient nodeClient;

private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private final Settings settings;
@Override
public QueryId submit(AbstractPlan queryPlan) {
schedule(nodeClient, () -> queryPlan.execute());
TimeValue timeout = settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT);
schedule(nodeClient, queryPlan::execute, timeout);

return queryPlan.getQueryId();
}

private void schedule(NodeClient client, Runnable task) {
private void schedule(NodeClient client, Runnable task, TimeValue timeout) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);

Runnable wrappedTask =
withCurrentContext(
() -> {
final Thread executionThread = Thread.currentThread();

Scheduler.ScheduledCancellable timeoutTask =
threadPool.schedule(
() -> {
LOG.warn(
"Query execution timed out after {}. Interrupting execution thread.",
timeout);
executionThread.interrupt();
},
timeout,
ThreadPool.Names.GENERIC);

try {
task.run();
timeoutTask.cancel();
// Clear any leftover thread interrupts to keep the thread pool clean
Thread.interrupted();
} catch (Exception e) {
timeoutTask.cancel();

// Special-case handling of timeout-related interruptions
if (Thread.interrupted() || e.getCause() instanceof InterruptedException) {
LOG.error("Query was interrupted due to timeout after {}", timeout);
throw new OpenSearchTimeoutException(
"Query execution timed out after " + timeout);
}

throw e;
}
});

threadPool.schedule(wrappedTask, new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
}

private Runnable withCurrentContext(final Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
Expand All @@ -41,15 +40,15 @@

/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
@Value.Enclosing
public class AggregateIndexScanRule extends RelRule<AggregateIndexScanRule.Config> {
public class AggregateIndexScanRule extends InterruptibleRelRule<AggregateIndexScanRule.Config> {

/** Creates a AggregateIndexScanRule. */
protected AggregateIndexScanRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
if (call.rels.length == 5) {
final LogicalAggregate aggregate = call.rel(0);
final LogicalProject topProject = call.rel(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.stream.Collectors;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
Expand All @@ -33,16 +32,17 @@
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

@Value.Enclosing
public class DedupPushdownRule extends RelRule<DedupPushdownRule.Config> {
public class DedupPushdownRule extends InterruptibleRelRule<DedupPushdownRule.Config> {
private static final Logger LOG = LogManager.getLogger();

protected DedupPushdownRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
final LogicalProject finalProject = call.rel(0);
// TODO Used when number of duplication is more than 1
final LogicalFilter numOfDedupFilter = call.rel(1);
final LogicalProject projectWithWindow = call.rel(2);
if (call.rels.length == 5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.function.Predicate;
import org.apache.calcite.adapter.enumerable.EnumerableProject;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.AbstractConverter;
Expand Down Expand Up @@ -44,14 +43,14 @@
*/
@Value.Enclosing
public class ExpandCollationOnProjectExprRule
extends RelRule<ExpandCollationOnProjectExprRule.Config> {
extends InterruptibleRelRule<ExpandCollationOnProjectExprRule.Config> {

protected ExpandCollationOnProjectExprRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
final AbstractConverter converter = call.rel(0);
final Project project = call.rel(1);
final RelTraitSet toTraits = converter.getTraitSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.logical.LogicalFilter;
Expand All @@ -18,15 +17,15 @@

/** Planner rule that push a {@link LogicalFilter} down to {@link CalciteLogicalIndexScan} */
@Value.Enclosing
public class FilterIndexScanRule extends RelRule<FilterIndexScanRule.Config> {
public class FilterIndexScanRule extends InterruptibleRelRule<FilterIndexScanRule.Config> {

/** Creates a FilterIndexScanRule. */
protected FilterIndexScanRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final LogicalFilter filter = call.rel(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.planner.rules;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig;

/**
* Base class for OpenSearch planner rules that automatically checks for thread interruption during
* query planning. This ensures that long-running planning operations can be interrupted when a
* query timeout occurs.
*
* <p>All OpenSearch planner rules should extend this class instead of extending {@link RelRule}
* directly. This provides automatic timeout support without requiring manual interruption checks in
* each rule.
*
* <p>Example usage:
*
* <pre>{@code
* public class MyCustomRule extends InterruptibleRelRule<MyCustomRule.Config> {
* protected MyCustomRule(Config config) {
* super(config);
* }
*
* @Override
* protected void onMatchImpl(RelOptRuleCall call) {
* // Rule implementation - interruption is checked automatically
* // before this method is called
* }
* }
* }</pre>
*
* @param <C> the configuration type for this rule
*/
public abstract class InterruptibleRelRule<C extends OpenSearchRuleConfig> extends RelRule<C> {

/**
* Constructs an InterruptibleRelRule with the given configuration.
*
* @param config the rule configuration
*/
protected InterruptibleRelRule(C config) {
super(config);
}

/**
* Called when the rule matches. This method checks for thread interruption before delegating to
* the implementation-specific {@link #onMatchImpl(RelOptRuleCall)} method.
*
* <p>Do not override this method in subclasses. Instead, override {@link
* #onMatchImpl(RelOptRuleCall)}.
*
* @param call the rule call context
* @throws RuntimeException wrapping {@link InterruptedException} if the thread has been
* interrupted
*/
@Override
public final void onMatch(RelOptRuleCall call) {
if (Thread.currentThread().isInterrupted()) {
throw new OpenSearchTimeoutException(
new InterruptedException(
"Query planning interrupted in rule: " + getClass().getSimpleName()));
}

onMatchImpl(call);
}

/**
* Implementation-specific match handler. Subclasses must implement this method instead of
* overriding {@link #onMatch(RelOptRuleCall)}.
*
* <p>This method is called after an automatic interruption check. If the thread has been
* interrupted (due to a timeout), this method will not be called.
*
* @param call the rule call context
*/
protected abstract void onMatchImpl(RelOptRuleCall call);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.util.Objects;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rex.RexLiteral;
Expand All @@ -22,14 +21,14 @@
* down to {@link CalciteLogicalIndexScan}
*/
@Value.Enclosing
public class LimitIndexScanRule extends RelRule<LimitIndexScanRule.Config> {
public class LimitIndexScanRule extends InterruptibleRelRule<LimitIndexScanRule.Config> {

protected LimitIndexScanRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
final LogicalSort sort = call.rel(0);
final CalciteLogicalIndexScan scan = call.rel(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Objects;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
Expand All @@ -27,15 +26,15 @@

/** Planner rule that push a {@link LogicalProject} down to {@link CalciteLogicalIndexScan} */
@Value.Enclosing
public class ProjectIndexScanRule extends RelRule<ProjectIndexScanRule.Config> {
public class ProjectIndexScanRule extends InterruptibleRelRule<ProjectIndexScanRule.Config> {

/** Creates a ProjectIndexScanRule. */
protected ProjectIndexScanRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final LogicalProject project = call.rel(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.stream.Collectors;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexCall;
Expand All @@ -26,14 +25,14 @@
import org.opensearch.sql.opensearch.storage.scan.context.RareTopDigest;

@Value.Enclosing
public class RareTopPushdownRule extends RelRule<RareTopPushdownRule.Config> {
public class RareTopPushdownRule extends InterruptibleRelRule<RareTopPushdownRule.Config> {

protected RareTopPushdownRule(Config config) {
super(config);
}

@Override
public void onMatch(RelOptRuleCall call) {
protected void onMatchImpl(RelOptRuleCall call) {
final LogicalFilter filter = call.rel(0);
final LogicalProject project = call.rel(1);
final CalciteLogicalIndexScan scan = call.rel(2);
Expand Down
Loading
Loading