Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,12 @@ public TaskInfo updateTask(
this::notifyStatusChanged);
taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
needsPlan.set(false);
taskExecution.start();
}
}

if (taskExecution != null) {
Comment thread
arhimondr marked this conversation as resolved.
Outdated
taskExecution.addSplitAssignments(splitAssignments);
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
}
taskExecution.addSplitAssignments(splitAssignments);
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
}
catch (Error e) {
failed(e);
Expand Down
190 changes: 43 additions & 147 deletions core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -42,7 +41,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
Expand All @@ -58,14 +56,17 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Comment thread
arhimondr marked this conversation as resolved.
Outdated
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.trino.SystemSessionProperties.getInitialSplitsPerNode;
import static io.trino.SystemSessionProperties.getMaxDriversPerTask;
Expand Down Expand Up @@ -113,34 +114,10 @@ public class SqlTaskExecution
@GuardedBy("this")
private final Map<PlanNodeId, PendingSplitsForPlanNode> pendingSplitsByPlanNode;

private final Status status;
// number of created Drivers that haven't yet finished
private final AtomicLong remainingDrivers = new AtomicLong();

static SqlTaskExecution createSqlTaskExecution(
TaskStateMachine taskStateMachine,
TaskContext taskContext,
OutputBuffer outputBuffer,
LocalExecutionPlan localExecutionPlan,
TaskExecutor taskExecutor,
Executor notificationExecutor,
SplitMonitor queryMonitor)
{
SqlTaskExecution task = new SqlTaskExecution(
taskStateMachine,
taskContext,
outputBuffer,
localExecutionPlan,
taskExecutor,
queryMonitor,
notificationExecutor);
try (SetThreadName ignored = new SetThreadName("Task-%s", task.getTaskId())) {
// The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
// The call back is accessed from another thread, so this code cannot be placed in the constructor.
task.scheduleDriversForTaskLifeCycle();
return task;
}
}

private SqlTaskExecution(
public SqlTaskExecution(
TaskStateMachine taskStateMachine,
TaskContext taskContext,
OutputBuffer outputBuffer,
Expand Down Expand Up @@ -178,10 +155,6 @@ private SqlTaskExecution(

this.pendingSplitsByPlanNode = this.driverRunnerFactoriesWithSplitLifeCycle.keySet().stream()
.collect(toImmutableMap(identity(), ignore -> new PendingSplitsForPlanNode()));
this.status = new Status(
localExecutionPlan.getDriverFactories().stream()
.map(DriverFactory::getPipelineId)
.collect(toImmutableSet()));
sourceStartOrder = localExecutionPlan.getPartitionedSourceOrder();

checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(partitionedSources),
Expand All @@ -199,6 +172,15 @@ private SqlTaskExecution(
}
}

public void start()
{
try (SetThreadName ignored = new SetThreadName("Task-%s", getTaskId())) {
// The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
// The call back is accessed from another thread, so this code cannot be placed in the constructor.
scheduleDriversForTaskLifeCycle();
}
}

// this is a separate method to ensure that the `this` reference is not leaked during construction
private static TaskHandle createTaskHandle(
TaskStateMachine taskStateMachine,
Expand Down Expand Up @@ -296,11 +278,6 @@ private synchronized Map<PlanNodeId, SplitAssignment> updateSplitAssignments(Lis
}
}

for (DriverSplitRunnerFactory driverSplitRunnerFactory :
Iterables.concat(driverRunnerFactoriesWithSplitLifeCycle.values(), driverRunnerFactoriesWithTaskLifeCycle)) {
driverSplitRunnerFactory.closeDriverFactoryIfFullyCreated();
}

// update maxAcknowledgedSplit
maxAcknowledgedSplit = splitAssignments.stream()
.flatMap(source -> source.getSplits().stream())
Expand Down Expand Up @@ -392,6 +369,7 @@ private void scheduleDriversForTaskLifeCycle()
driverRunnerFactory.noMoreDriverRunner();
verify(driverRunnerFactory.isNoMoreDriverRunner());
}
checkTaskCompletion();
}

private synchronized void enqueueDriverSplitRunner(boolean forceRunSplit, List<DriverSplitRunner> runners)
Expand All @@ -406,7 +384,7 @@ private synchronized void enqueueDriverSplitRunner(boolean forceRunSplit, List<D
DriverSplitRunner splitRunner = runners.get(i);

// record new driver
status.incrementRemainingDriver();
remainingDrivers.incrementAndGet();

Futures.addCallback(finishedFuture, new FutureCallback<Object>()
{
Expand All @@ -415,7 +393,7 @@ public void onSuccess(Object result)
{
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
// record driver is finished
status.decrementRemainingDriver();
remainingDrivers.decrementAndGet();

checkTaskCompletion();

Expand All @@ -430,7 +408,7 @@ public void onFailure(Throwable cause)
taskStateMachine.failed(cause);

// record driver is finished
status.decrementRemainingDriver();
remainingDrivers.decrementAndGet();

// fire failed event with cause
splitMonitor.splitFailedEvent(taskId, getDriverStats(), cause);
Expand Down Expand Up @@ -477,14 +455,14 @@ private synchronized void checkTaskCompletion()
return;
}

// are there more partition splits expected?
for (DriverSplitRunnerFactory driverSplitRunnerFactory : driverRunnerFactoriesWithSplitLifeCycle.values()) {
if (!driverSplitRunnerFactory.isNoMoreDriverRunner()) {
// are there more drivers expected?
for (DriverSplitRunnerFactory driverSplitRunnerFactory : concat(driverRunnerFactoriesWithTaskLifeCycle, driverRunnerFactoriesWithSplitLifeCycle.values())) {
if (!driverSplitRunnerFactory.isNoMoreDrivers()) {
return;
}
}
// do we still have running tasks?
if (status.getRemainingDriver() != 0) {
if (remainingDrivers.get() != 0) {
return;
}

Expand Down Expand Up @@ -520,7 +498,7 @@ public String toString()
{
return toStringHelper(this)
.add("taskId", taskId)
.add("remainingDrivers", status.getRemainingDriver())
.add("remainingDrivers", remainingDrivers.get())
.add("unpartitionedSplitAssignments", unpartitionedSplitAssignments)
.toString();
}
Expand Down Expand Up @@ -595,7 +573,11 @@ private class DriverSplitRunnerFactory
{
private final DriverFactory driverFactory;
private final PipelineContext pipelineContext;
private boolean closed;

// number of created DriverSplitRunners that haven't created underlying Driver
private final AtomicInteger pendingCreations = new AtomicInteger();
// true if no more DriverSplitRunners will be created
private final AtomicBoolean noMoreDriverRunner = new AtomicBoolean();

private DriverSplitRunnerFactory(DriverFactory driverFactory, boolean partitioned)
{
Expand All @@ -607,7 +589,8 @@ private DriverSplitRunnerFactory(DriverFactory driverFactory, boolean partitione
// The former will take two arguments, and the latter will take one. This will simplify the signature quite a bit.
public DriverSplitRunner createDriverRunner(@Nullable ScheduledSplit partitionedSplit)
{
status.incrementPendingCreation(pipelineContext.getPipelineId());
checkState(!noMoreDriverRunner.get(), "noMoreDriverRunner is set");
pendingCreations.incrementAndGet();
// create driver context immediately so the driver existence is recorded in the stats
// the number of drivers is used to balance work across nodes
long splitWeight = partitionedSplit == null ? 0 : partitionedSplit.getSplit().getSplitWeight().getRawValue();
Expand Down Expand Up @@ -637,33 +620,36 @@ public Driver createDriver(DriverContext driverContext, @Nullable ScheduledSplit
}
}

status.decrementPendingCreation(pipelineContext.getPipelineId());
pendingCreations.decrementAndGet();
closeDriverFactoryIfFullyCreated();

return driver;
}

public void noMoreDriverRunner()
{
status.setNoMoreDriverRunner(pipelineContext.getPipelineId());
noMoreDriverRunner.set(true);
closeDriverFactoryIfFullyCreated();
}

public boolean isNoMoreDriverRunner()
{
return status.isNoMoreDriverRunners(pipelineContext.getPipelineId());
return noMoreDriverRunner.get();
}

public void closeDriverFactoryIfFullyCreated()
{
if (closed) {
if (driverFactory.isNoMoreDrivers()) {
return;
}
if (!isNoMoreDriverRunner() || status.getPendingCreation(pipelineContext.getPipelineId()) != 0) {
return;
if (isNoMoreDriverRunner() && pendingCreations.get() == 0) {
driverFactory.noMoreDrivers();
}
driverFactory.noMoreDrivers();
closed = true;
}

public boolean isNoMoreDrivers()
{
return driverFactory.isNoMoreDrivers();
}

public OptionalInt getDriverInstances()
Expand Down Expand Up @@ -780,94 +766,4 @@ public void stateChanged(BufferState newState)
}
}
}

@ThreadSafe
private static class Status
{
// no more driver runner: true if no more DriverSplitRunners will be created.
// pending creation: number of created DriverSplitRunners that haven't created underlying Driver.
// remaining driver: number of created Drivers that haven't yet finished.
Comment thread
arhimondr marked this conversation as resolved.
Outdated

@GuardedBy("this")
private final int pipelineWithTaskLifeCycleCount;

// For these 3 perX fields, they are populated lazily. If enumeration operations on the
// map can lead to side effects, no new entries can be created after such enumeration has
// happened. Otherwise, the order of entry creation and the enumeration operation will
// lead to different outcome.
@GuardedBy("this")
private final Map<Integer, PerPipelineStatus> perPipeline;
@GuardedBy("this")
int pipelinesWithNoMoreDriverRunners;

@GuardedBy("this")
private int overallRemainingDriver;

public Status(Set<Integer> pipelineIds)
{
int pipelineWithTaskLifeCycleCount = 0;
ImmutableMap.Builder<Integer, PerPipelineStatus> perPipeline = ImmutableMap.builder();
for (int pipelineId : pipelineIds) {
perPipeline.put(pipelineId, new PerPipelineStatus());
pipelineWithTaskLifeCycleCount++;
}
this.pipelineWithTaskLifeCycleCount = pipelineWithTaskLifeCycleCount;
this.perPipeline = perPipeline.buildOrThrow();
}

public synchronized void setNoMoreDriverRunner(int pipelineId)
{
per(pipelineId).noMoreDriverRunners = true;
pipelinesWithNoMoreDriverRunners++;
}

public synchronized void incrementPendingCreation(int pipelineId)
{
per(pipelineId).pendingCreation++;
}

public synchronized void decrementPendingCreation(int pipelineId)
{
per(pipelineId).pendingCreation--;
}

public synchronized void incrementRemainingDriver()
{
checkState(!(pipelinesWithNoMoreDriverRunners == pipelineWithTaskLifeCycleCount), "Cannot increment remainingDriver. NoMoreSplits is set.");
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
overallRemainingDriver++;
}

public synchronized void decrementRemainingDriver()
{
checkState(overallRemainingDriver > 0, "Cannot decrement remainingDriver. Value is 0.");
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
overallRemainingDriver--;
}

public synchronized int getPendingCreation(int pipelineId)
{
return per(pipelineId).pendingCreation;
}

public synchronized int getRemainingDriver()
{
return overallRemainingDriver;
}

public synchronized boolean isNoMoreDriverRunners(int pipelineId)
{
return per(pipelineId).noMoreDriverRunners;
}

@GuardedBy("this")
private PerPipelineStatus per(int pipelineId)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier updates to different pipeline status could not be done in parallel, now they can be (which appears to be a good thing), was this level of synchronisation unnecessary ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like so, unless there was a necessity when the grouped execution was still there

{
return perPipeline.get(pipelineId);
}
}

private static class PerPipelineStatus
{
int pendingCreation;
boolean noMoreDriverRunners;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.Executor;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static io.trino.execution.SqlTaskExecution.createSqlTaskExecution;
import static java.util.Objects.requireNonNull;

public class SqlTaskExecutionFactory
Expand Down Expand Up @@ -91,13 +90,13 @@ public SqlTaskExecution create(
throw new RuntimeException(e);
}
}
return createSqlTaskExecution(
return new SqlTaskExecution(
taskStateMachine,
taskContext,
outputBuffer,
localExecutionPlan,
taskExecutor,
taskNotificationExecutor,
splitMonitor);
splitMonitor,
taskNotificationExecutor);
}
}
Loading