Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ public ListenableFuture<Void> processFor(Duration duration)
driver = this.driver;
}

return driver.processFor(duration);
return driver.processForDuration(duration);
}

@Override
Expand Down
241 changes: 123 additions & 118 deletions core/trino-main/src/main/java/io/trino/operator/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.Boolean.TRUE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

//
// NOTE: As a general strategy the methods should "stage" a change and only
Expand All @@ -66,6 +67,8 @@ public class Driver
{
private static final Logger log = Logger.get(Driver.class);

private static final Duration UNLIMITED_DURATION = new Duration(Long.MAX_VALUE, NANOSECONDS);

private final DriverContext driverContext;
private final List<Operator> activeOperators;
// this is present only for debugging
Expand Down Expand Up @@ -268,33 +271,70 @@ private void processNewSources()
currentSplitAssignment = newAssignment;
}

public ListenableFuture<Void> processFor(Duration duration)
public ListenableFuture<Void> processForDuration(Duration duration)
{
return process(duration, Integer.MAX_VALUE);
}

public ListenableFuture<Void> processForNumberOfIterations(int maxIterations)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is always called with 1; maybe replace with processOnce()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that. It feels like processOnce is less intuitive. Though I don't have a strong opinion here.

{
return process(UNLIMITED_DURATION, maxIterations);
}

public ListenableFuture<Void> processUntilBlocked()
{
return process(UNLIMITED_DURATION, Integer.MAX_VALUE);
}

@VisibleForTesting
public ListenableFuture<Void> process(Duration maxRuntime, int maxIterations)
{
checkLockNotHeld("Cannot process for a duration while holding the driver lock");

requireNonNull(duration, "duration is null");
requireNonNull(maxRuntime, "maxRuntime is null");
checkArgument(maxIterations > 0, "maxIterations must be greater than zero");

// if the driver is blocked we don't need to continue
SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}

long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
long maxRuntimeInNanos = maxRuntime.roundTo(TimeUnit.NANOSECONDS);

Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> {
OperationTimer operationTimer = createTimer();
driverContext.startProcessTimer();
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
driverContext.getYieldSignal().setWithDelay(maxRuntimeInNanos, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
int iterations = 0;
while (!isFinishedInternal()) {
ListenableFuture<Void> future = processInternal(operationTimer);
iterations++;
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
if (System.nanoTime() - start >= maxRuntimeInNanos || iterations >= maxIterations) {
break;
}
}
}
catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;
}
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());

// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
}
finally {
driverContext.getYieldSignal().reset();
Expand All @@ -305,23 +345,6 @@ public ListenableFuture<Void> processFor(Duration duration)
return result.orElse(NOT_BLOCKED);
}

public ListenableFuture<Void> process()
{
checkLockNotHeld("Cannot process while holding the driver lock");

// if the driver is blocked we don't need to continue
SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}

Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, true, () -> {
ListenableFuture<Void> future = processInternal(createTimer());
return updateDriverBlockedFuture(future);
});
return result.orElse(NOT_BLOCKED);
}

private OperationTimer createTimer()
{
return new OperationTimer(
Expand Down Expand Up @@ -359,119 +382,101 @@ private ListenableFuture<Void> processInternal(OperationTimer operationTimer)

handleMemoryRevoke();

try {
processNewSources();

// If there is only one operator, finish it
// Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously
// TODO remove the second part of the if statement, when these operators are fixed
// Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early
if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
Operator rootOperator = activeOperators.get(0);
rootOperator.finish();
rootOperator.getOperatorContext().recordFinish(operationTimer);
}
processNewSources();

boolean movedPage = false;
for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) {
Operator current = activeOperators.get(i);
Operator next = activeOperators.get(i + 1);
// If there is only one operator, finish it
// Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously
// TODO remove the second part of the if statement, when these operators are fixed
// Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early
if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
Operator rootOperator = activeOperators.get(0);
rootOperator.finish();
rootOperator.getOperatorContext().recordFinish(operationTimer);
}

// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
continue;
}
boolean movedPage = false;
for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) {
Operator current = activeOperators.get(i);
Operator next = activeOperators.get(i + 1);

// if the current operator is not finished and next operator isn't blocked and needs input...
if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) {
// get an output page from current operator
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(operationTimer, page);

// if we got an output page, add it to the next operator
if (page != null && page.getPositionCount() != 0) {
next.addInput(page);
next.getOperatorContext().recordAddInput(operationTimer, page);
movedPage = true;
}
// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
continue;
}

if (current instanceof SourceOperator) {
movedPage = true;
}
// if the current operator is not finished and next operator isn't blocked and needs input...
if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) {
// get an output page from current operator
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(operationTimer, page);

// if we got an output page, add it to the next operator
if (page != null && page.getPositionCount() != 0) {
next.addInput(page);
next.getOperatorContext().recordAddInput(operationTimer, page);
movedPage = true;
}

// if current operator is finished...
if (current.isFinished()) {
// let next operator know there will be no more data
next.finish();
next.getOperatorContext().recordFinish(operationTimer);
if (current instanceof SourceOperator) {
movedPage = true;
}
}

for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
// close and remove this operator and all source operators
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Throwable throwable = closeAndDestroyOperators(finishedOperators);
finishedOperators.clear();
if (throwable != null) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
// Finish the next operator, which is now the first operator.
if (!activeOperators.isEmpty()) {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
newRootOperator.getOperatorContext().recordFinish(operationTimer);
}
break;
}
// if current operator is finished...
if (current.isFinished()) {
// let next operator know there will be no more data
next.finish();
next.getOperatorContext().recordFinish(operationTimer);
}
}

// if we did not move any pages, check if we are blocked
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<Void>> blockedFutures = new ArrayList<>();
for (Operator operator : activeOperators) {
Optional<ListenableFuture<Void>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
// close and remove this operator and all source operators
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Throwable throwable = closeAndDestroyOperators(finishedOperators);
finishedOperators.clear();
if (throwable != null) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}

if (!blockedFutures.isEmpty()) {
// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
// Finish the next operator, which is now the first operator.
if (!activeOperators.isEmpty()) {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
newRootOperator.getOperatorContext().recordFinish(operationTimer);
}
break;
}

return NOT_BLOCKED;
}
catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;

// if we did not move any pages, check if we are blocked
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<Void>> blockedFutures = new ArrayList<>();
for (Operator operator : activeOperators) {
Optional<ListenableFuture<Void>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
}

// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
if (!blockedFutures.isEmpty()) {
// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
}
}

return NOT_BLOCKED;
}

@GuardedBy("exclusiveLock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.StatisticAggregationsDescriptor;
import io.trino.util.AutoCloseableCloser;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -360,7 +361,10 @@ private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicRef
public void close()
throws Exception
{
statisticsAggregationOperator.close();
AutoCloseableCloser closer = AutoCloseableCloser.create();
closer.register(() -> statisticsAggregationOperator.getOperatorContext().destroy());
closer.register(statisticsAggregationOperator);
closer.close();
}

public interface TableFinisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ public void close()
closer.register(pageSink::abort);
}
}
closer.register(() -> statisticAggregationOperator.getOperatorContext().destroy());
closer.register(statisticAggregationOperator);
closer.register(pageSinkMemoryContext::close);
closer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public boolean load(List<UpdateRequest> requests)
ScheduledSplit split = new ScheduledSplit(0, sourcePlanNodeId, new Split(INDEX_CONNECTOR_ID, new IndexSplit(recordSetForLookupSource), Lifespan.taskWide()));
driver.updateSplitAssignment(new SplitAssignment(sourcePlanNodeId, ImmutableSet.of(split), true));
while (!driver.isFinished()) {
ListenableFuture<Void> process = driver.process();
ListenableFuture<Void> process = driver.processUntilBlocked();
checkState(process.isDone(), "Driver should never block");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private boolean loadNextPage()
if (driver.isFinished()) {
return false;
}
driver.process();
driver.processForNumberOfIterations(1);
nextPage = extractNonEmptyPage(pageBuffer);
}
currentPage = nextPage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ private MaterializedResultWithPlan executeInternal(Session session, @Language("S
}

if (!driver.isFinished()) {
driver.process();
driver.processForNumberOfIterations(1);
processed = true;
}
}
Expand Down
Loading