Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
Expand All @@ -21,6 +20,8 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

/**
* The job runner class for scheduling async query.
*
Expand All @@ -37,7 +38,7 @@
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
// Share SQL plugin thread pool
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
SQL_WORKER_THREAD_POOL_NAME;
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);

private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.time.Instant;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
Expand Down Expand Up @@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.node.NodeClient;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

/** The scheduler which schedule the task run in sql-worker thread pool. */
/** The scheduler which schedule the task run in sql_worker thread pool. */
@UtilityClass
public class Scheduler {

public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

public static void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/query-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ Parser parse raw query as Statement and create AbstractPlan. Each AbstractPlan d
### Change of existing logic
1. Remove the schedule logic in NIO thread. After the change,
a. Parser will be executed in NIO thread.
b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql-worker** thread pool.
b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql_worker** thread pool.
21 changes: 21 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ Result set::
"transient": {}
}

Thread Pool Settings
====================

The SQL plugin is integrated with the `OpenSearch Thread Pool Settings <https://docs.opensearch.org/latest/install-and-configure/configuring-opensearch/thread-pool-settings/>`.
There are two thread pools which can be configured on cluster setup via `settings.yml`::

thread_pool:
sql_worker:
size: 30
queue_size: 100
sql_background_io:
size: 30
queue_size: 1000

The ``sql_worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets.
This directly maps to the number of queries that can be run concurrently.
This is the primary pool you interact with externally.
``sql_background_io`` is a low-footprint pool for IO requests the plugin makes,
and can be used to partially the search load SQL places on your cluster for some types of expensive operations.
A ``sql_worker`` thread may spawn multiple background threads.

plugins.query.executionengine.spark.session.limit
==================================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand All @@ -30,10 +32,6 @@

/** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */
public class AsyncRestExecutor implements RestExecutor {

/** Custom thread pool name managed by OpenSearch */
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand All @@ -24,9 +26,6 @@
import org.opensearch.transport.client.Client;

public class CursorAsyncRestExecutor {
/** Custom thread pool name managed by OpenSearch */
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class);

/** Delegated rest executor to async */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.plugin;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
Expand Down Expand Up @@ -90,7 +91,7 @@ protected Set<String> responseParams() {

private void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker");
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
}

private Runnable withCurrentContext(final Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void cleanup(OpenSearchRequest request) {

@Override
public void schedule(Runnable task) {
// at that time, task already running the sql-worker ThreadPool.
// at that time, task already running the sql_worker ThreadPool.
task.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class OpenSearchQueryManager implements QueryManager {

private final NodeClient nodeClient;

private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql_worker";
public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io";

@Override
public QueryId submit(AbstractPlan queryPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

package org.opensearch.sql.opensearch.storage.scan;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.calcite.linq4j.Enumerator;
Expand All @@ -31,6 +36,8 @@ public class OpenSearchIndexEnumerator implements Enumerator<Object> {
/** OpenSearch client. */
private final OpenSearchClient client;

private final Executor backgroundExecutor;

private final List<String> fields;

/** Search request. */
Expand All @@ -56,6 +63,9 @@ public class OpenSearchIndexEnumerator implements Enumerator<Object> {

private ExprValue current;

private CompletableFuture<OpenSearchResponse> nextBatchFuture;
private boolean isLastBatch = false;

/** flag to indicate whether fetch more than one batch */
private boolean fetchOnce = false;

Expand All @@ -66,7 +76,6 @@ public OpenSearchIndexEnumerator(
int maxResultWindow,
OpenSearchRequest request,
ResourceMonitor monitor) {
this.client = client;
this.fields = fields;
this.request = request;
this.maxResponseSize = maxResponseSize;
Expand All @@ -77,21 +86,41 @@ public OpenSearchIndexEnumerator(
if (!this.monitor.isHealthy()) {
throw new NonFallbackCalciteException("insufficient resources to run the query, quit.");
}

this.client = client;
this.backgroundExecutor =
client.getNodeClient().threadPool().executor(SQL_BACKGROUND_THREAD_POOL_NAME);
this.nextBatchFuture =
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
}

private void fetchNextBatch() {
OpenSearchResponse response = client.search(request);
if (response.isAggregationResponse()
|| response.isCountResponse()
|| response.getHitsSize() < maxResultWindow) {
// no need to fetch next batch if it's for an aggregation
// or the length of response hits is less than max result window size.
fetchOnce = true;
}
if (!response.isEmpty()) {
iterator = response.iterator();
} else if (iterator == null) {
iterator = Collections.emptyIterator();
try {
OpenSearchResponse response = nextBatchFuture.get();
// Start by determining whether we actually need future batches
if (response.isAggregationResponse()
|| response.isCountResponse()
|| response.getHitsSize() < maxResultWindow) {
// No need to fetch next batch if it's for an aggregation
// or the length of response hits is less than max result window size.
fetchOnce = true;
}
if (!response.isEmpty()) {
iterator = response.iterator();

// If we haven't hit the end, start pre-fetching next batch
if (!isLastBatch && !fetchOnce) {
nextBatchFuture =
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
}
} else {
if (iterator == null) {
iterator = Collections.emptyIterator();
}
isLastBatch = true;
}
} catch (InterruptedException | ExecutionException e) {
throw new NonFallbackCalciteException("Error fetching batch: " + e.getMessage());
}
}

Expand All @@ -113,6 +142,7 @@ private Object resolveForCalcite(ExprValue value, String rawPath) {
@Override
public boolean moveNext() {
if (queryCount >= maxResponseSize) {
isLastBatch = true;
return false;
}

Expand All @@ -129,24 +159,34 @@ public boolean moveNext() {
queryCount++;
return true;
} else {
isLastBatch = true;
return false;
}
}

@Override
public void reset() {
isLastBatch = false;
nextBatchFuture =
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
OpenSearchResponse response = client.search(request);
if (!response.isEmpty()) {
iterator = response.iterator();
} else {
iterator = Collections.emptyIterator();
isLastBatch = true;
}
queryCount = 0;
}

@Override
public void close() {
iterator = Collections.emptyIterator();
queryCount = 0;
isLastBatch = true;
if (nextBatchFuture != null) {
nextBatchFuture.cancel(true);
}
if (request != null) {
client.forceCleanup(request);
request = null;
Expand Down
20 changes: 15 additions & 5 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

package org.opensearch.sql.plugin;

import static java.util.Collections.singletonList;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -74,7 +75,6 @@
import org.opensearch.sql.datasources.transport.TransportPatchDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.plugin.RestSqlAction;
import org.opensearch.sql.legacy.plugin.RestSqlStatsAction;
Expand Down Expand Up @@ -277,13 +277,23 @@ public ScheduledJobParser getJobParser() {

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return singletonList(
// The worker pool is the primary pool where most of the work is done. The background thread
// pool is a separate queue for asynchronous requests to other nodes. We keep them separate to
// prevent deadlocks during async fetches on small node counts. Tasks in the background pool
// should do no work except I/O to other services.
return List.of(
new FixedExecutorBuilder(
settings,
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME,
SQL_WORKER_THREAD_POOL_NAME,
OpenSearchExecutors.allocatedProcessors(settings),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is the best number.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it should match the number of search threads since that's where all the requests go, maybe I can find where that number is stored and do a lookup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to pull the search thread pool count if available, otherwise fallback to node processors. This is what it looks like if you limit the search thread pool under heavy load:

image

Intuitively this seems like a pretty informative view of what state the cluster's in regarding SQL queries.

1000,
null));
"thread_pool." + SQL_WORKER_THREAD_POOL_NAME),
new FixedExecutorBuilder(
settings,
SQL_BACKGROUND_THREAD_POOL_NAME,
OpenSearchExecutors.allocatedProcessors(settings),
100, // No external API connects to this, we should have sane resource usage
"thread_pool." + SQL_BACKGROUND_THREAD_POOL_NAME));
}

@Override
Expand Down
Loading