Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class QueryManagerConfig
private int queryManagerExecutorPoolSize = 5;
private int queryExecutorPoolSize = 1000;
private int maxStateMachineCallbackThreads = 5;
private int maxSplitManagerCallbackThreads = 100;
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated

/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}
Expand Down Expand Up @@ -394,6 +395,20 @@ public QueryManagerConfig setMaxStateMachineCallbackThreads(int maxStateMachineC
return this;
}

@Min(1)
public int getMaxSplitManagerCallbackThreads()
{
return maxSplitManagerCallbackThreads;
}

@Config("query.max-split-manager-callback-threads")
@ConfigDescription("The maximum number of threads allowed to run splits generation callbacks concurrently")
public QueryManagerConfig setMaxSplitManagerCallbackThreads(int maxSplitManagerCallbackThreads)
{
this.maxSplitManagerCallbackThreads = maxSplitManagerCallbackThreads;
return this;
}

@NotNull
@MinDuration("1s")
public Duration getRemoteTaskMaxErrorDuration()
Expand Down
111 changes: 88 additions & 23 deletions core/trino-main/src/main/java/io/trino/split/BufferingSplitSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
*/
package io.trino.split;

import com.google.common.util.concurrent.Futures;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.opentelemetry.context.Context;
import io.trino.metadata.Split;
import io.trino.spi.connector.CatalogHandle;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

Expand All @@ -33,10 +38,12 @@ public class BufferingSplitSource
{
private final int bufferSize;
private final SplitSource source;
private final Executor executor;

public BufferingSplitSource(SplitSource source, int bufferSize)
public BufferingSplitSource(SplitSource source, Executor executor, int bufferSize)
{
this.source = requireNonNull(source, "source is null");
this.executor = requireNonNull(executor, "executor is null");
this.bufferSize = bufferSize;
}

Expand All @@ -50,7 +57,7 @@ public CatalogHandle getCatalogHandle()
public ListenableFuture<SplitBatch> getNextBatch(int maxSize)
{
checkArgument(maxSize > 0, "Cannot fetch a batch of zero size");
return GetNextBatch.fetchNextBatchAsync(source, Math.min(bufferSize, maxSize), maxSize);
return GetNextBatch.fetchNextBatchAsync(source, executor, Math.min(bufferSize, maxSize), maxSize);
}

@Override
Expand All @@ -72,50 +79,108 @@ public Optional<List<Object>> getTableExecuteSplitsInfo()
}

private static class GetNextBatch
extends AbstractFuture<SplitBatch>
{
private final Context context = Context.current();
private final SplitSource splitSource;
private final Executor executor;
private final int min;
private final int max;

@GuardedBy("this")
private final List<Split> splits = new ArrayList<>();
private boolean noMoreSplits;
@GuardedBy("this")
private ListenableFuture<SplitBatch> nextBatchFuture;

public static ListenableFuture<SplitBatch> fetchNextBatchAsync(
SplitSource splitSource,
Executor executor,
int min,
int max)
{
GetNextBatch getNextBatch = new GetNextBatch(splitSource, min, max);
ListenableFuture<Void> future = getNextBatch.fetchSplits();
return Futures.transform(future, ignored -> new SplitBatch(getNextBatch.splits, getNextBatch.noMoreSplits), directExecutor());
GetNextBatch getNextBatch = new GetNextBatch(splitSource, executor, min, max);
getNextBatch.fetchSplits();
return getNextBatch;
}

private GetNextBatch(SplitSource splitSource, int min, int max)
private GetNextBatch(SplitSource splitSource, Executor executor, int min, int max)
{
this.splitSource = requireNonNull(splitSource, "splitSource is null");
this.executor = requireNonNull(executor, "executor is null");
checkArgument(min <= max, "Min splits greater than max splits");
this.min = min;
this.max = max;
}

private ListenableFuture<Void> fetchSplits()
private synchronized void fetchSplits()
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
{
if (splits.size() >= min) {
return immediateVoidFuture();
}
ListenableFuture<SplitBatch> future;
checkState(nextBatchFuture == null || nextBatchFuture.isDone(), "nextBatchFuture is expected to be done");

try (var ignored = context.makeCurrent()) {
future = splitSource.getNextBatch(max - splits.size());
}
return Futures.transformAsync(future, splitBatch -> {
splits.addAll(splitBatch.getSplits());
if (splitBatch.isLastBatch()) {
noMoreSplits = true;
return immediateVoidFuture();
nextBatchFuture = splitSource.getNextBatch(max - splits.size());
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
// If the split source returns completed futures, we process them on
// directExecutor without chaining to avoid the overhead of going through separate executor
while (nextBatchFuture.isDone()) {
addCallback(
nextBatchFuture,
new FutureCallback<>()
{
@Override
public void onSuccess(SplitBatch splitBatch)
{
processBatch(splitBatch);
}

@Override
public void onFailure(Throwable throwable)
{
setException(throwable);
}
},
directExecutor());
if (isDone()) {
return;
}
nextBatchFuture = splitSource.getNextBatch(max - splits.size());
}
return fetchSplits();
}, directExecutor());
}

addCallback(
nextBatchFuture,
new FutureCallback<>()
{
@Override
public void onSuccess(SplitBatch splitBatch)
{
synchronized (GetNextBatch.this) {
if (processBatch(splitBatch)) {
return;
}
fetchSplits();
}
}

@Override
public void onFailure(Throwable throwable)
{
setException(throwable);
}
},
executor);
}

// Accumulates splits from the returned batch and returns whether
// sufficient splits have been buffered to satisfy min batch size
private synchronized boolean processBatch(SplitBatch splitBatch)
{
splits.addAll(splitBatch.getSplits());
boolean isLastBatch = splitBatch.isLastBatch();
if (splits.size() >= min || isLastBatch) {
set(new SplitBatch(ImmutableList.copyOf(splits), isLastBatch));
splits.clear();
return true;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.split;

import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
Expand All @@ -31,22 +32,27 @@
import io.trino.tracing.TrinoAttributes;

import java.util.Optional;
import java.util.concurrent.Executor;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class SplitManager
{
private final CatalogServiceProvider<ConnectorSplitManager> splitManagerProvider;
private final Tracer tracer;
private final int minScheduleSplitBatchSize;
private final Executor executor;

@Inject
public SplitManager(CatalogServiceProvider<ConnectorSplitManager> splitManagerProvider, Tracer tracer, QueryManagerConfig config)
{
this.splitManagerProvider = requireNonNull(splitManagerProvider, "splitManagerProvider is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.minScheduleSplitBatchSize = config.getMinScheduleSplitBatchSize();
this.executor = new BoundedExecutor(newCachedThreadPool(daemonThreadsNamed("splits-manager-callback-%s")), config.getMaxSplitManagerCallbackThreads());
}

public SplitSource getSplits(
Expand Down Expand Up @@ -77,7 +83,7 @@ public SplitSource getSplits(

if (minScheduleSplitBatchSize > 1) {
splitSource = new TracingSplitSource(splitSource, tracer, Optional.empty(), "split-batch");
splitSource = new BufferingSplitSource(splitSource, minScheduleSplitBatchSize);
splitSource = new BufferingSplitSource(splitSource, executor, minScheduleSplitBatchSize);
splitSource = new TracingSplitSource(splitSource, tracer, Optional.of(span), "split-buffer");
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testDefaults()
.setQueryManagerExecutorPoolSize(5)
.setQueryExecutorPoolSize(1000)
.setMaxStateMachineCallbackThreads(5)
.setMaxSplitManagerCallbackThreads(100)
.setRemoteTaskMaxErrorDuration(new Duration(5, MINUTES))
.setRemoteTaskMaxCallbackThreads(1000)
.setQueryExecutionPolicy("phased")
Expand Down Expand Up @@ -132,6 +133,7 @@ public void testExplicitPropertyMappings()
.put("query.manager-executor-pool-size", "11")
.put("query.executor-pool-size", "111")
.put("query.max-state-machine-callback-threads", "112")
.put("query.max-split-manager-callback-threads", "113")
.put("query.remote-task.max-error-duration", "60s")
.put("query.remote-task.max-callback-threads", "10")
.put("query.execution-policy", "foo-bar-execution-policy")
Expand Down Expand Up @@ -204,6 +206,7 @@ public void testExplicitPropertyMappings()
.setQueryManagerExecutorPoolSize(11)
.setQueryExecutorPoolSize(111)
.setMaxStateMachineCallbackThreads(112)
.setMaxSplitManagerCallbackThreads(113)
.setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxCallbackThreads(10)
.setQueryExecutionPolicy("foo-bar-execution-policy")
Expand Down
22 changes: 13 additions & 9 deletions core/trino-main/src/test/java/io/trino/split/MockSplitSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.trino.annotation.NotThreadSafe;
import com.google.errorprone.annotations.ThreadSafe;
import io.trino.metadata.Split;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
Expand All @@ -33,7 +33,7 @@
import static io.trino.split.MockSplitSource.Action.FINISH;
import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE;

@NotThreadSafe
@ThreadSafe
public class MockSplitSource
implements SplitSource
{
Expand All @@ -58,22 +58,22 @@ public MockSplitSource()
{
}

public MockSplitSource setBatchSize(int batchSize)
public synchronized MockSplitSource setBatchSize(int batchSize)
{
checkArgument(atSplitDepletion == DO_NOTHING, "cannot modify batch size once split completion action is set");
this.batchSize = batchSize;
return this;
}

public MockSplitSource increaseAvailableSplits(int count)
public synchronized MockSplitSource increaseAvailableSplits(int count)
{
checkArgument(atSplitDepletion == DO_NOTHING, "cannot increase available splits once split completion action is set");
totalSplits += count;
doGetNextBatch();
return this;
}

public MockSplitSource atSplitCompletion(Action action)
public synchronized MockSplitSource atSplitCompletion(Action action)
{
atSplitDepletion = action;
doGetNextBatch();
Expand All @@ -86,9 +86,13 @@ public CatalogHandle getCatalogHandle()
throw new UnsupportedOperationException();
}

private void doGetNextBatch()
private synchronized void doGetNextBatch()
{
checkState(splitsProduced <= totalSplits);
if (nextBatchFuture.isDone()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could it end up being active loop?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is added to prevent a problem of waiting forever because of splits being produced after a future is already done.
If there is a problem with some test usage, then the test will fail or loop forever.

// if nextBatchFuture is already done, we need to wait until new future is created through getNextBatch to produce splits
return;
}
if (splitsProduced == totalSplits) {
switch (atSplitDepletion) {
case FAIL:
Expand All @@ -111,7 +115,7 @@ private void doGetNextBatch()
}

@Override
public ListenableFuture<SplitBatch> getNextBatch(int maxSize)
public synchronized ListenableFuture<SplitBatch> getNextBatch(int maxSize)
{
checkState(nextBatchFuture.isDone(), "concurrent getNextBatch invocation");
nextBatchFuture = SettableFuture.create();
Expand All @@ -128,7 +132,7 @@ public void close()
}

@Override
public boolean isFinished()
public synchronized boolean isFinished()
{
return splitsProduced == totalSplits && atSplitDepletion == FINISH;
}
Expand All @@ -139,7 +143,7 @@ public Optional<List<Object>> getTableExecuteSplitsInfo()
return Optional.empty();
}

public int getNextBatchInvocationCount()
public synchronized int getNextBatchInvocationCount()
{
return nextBatchInvocationCount;
}
Expand Down
Loading