Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks)
if (newState == FAILED || newState == ABORTED) {
// don't close buffers for a failed query
// closed buffers signal to upstream tasks that everything finished cleanly
outputBuffer.fail();
outputBuffer.abort();
}
else {
outputBuffer.destroy();
Expand Down Expand Up @@ -488,12 +488,12 @@ public void acknowledgeTaskResults(OutputBufferId bufferId, long sequenceId)
outputBuffer.acknowledge(bufferId, sequenceId);
}

public TaskInfo abortTaskResults(OutputBufferId bufferId)
public TaskInfo destroyTaskResults(OutputBufferId bufferId)
{
requireNonNull(bufferId, "bufferId is null");

log.debug("Aborting task %s output %s", taskId, bufferId);
outputBuffer.abort(bufferId);
outputBuffer.destroy(bufferId);

return getTaskInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.operator.StageExecutionDescriptor;
import io.trino.operator.TaskContext;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;
import io.trino.sql.planner.LocalExecutionPlanner.LocalExecutionPlan;
import io.trino.sql.planner.plan.PlanNodeId;

Expand Down Expand Up @@ -76,6 +77,7 @@
import static io.trino.execution.SqlTaskExecution.SplitsState.FINISHED;
import static io.trino.execution.SqlTaskExecution.SplitsState.NO_MORE_SPLITS;
import static io.trino.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -639,14 +641,28 @@ private synchronized void checkTaskCompletion()
// no more output will be created
outputBuffer.setNoMorePages();

// are there still pages in the output buffer
if (!outputBuffer.isFinished()) {
BufferState bufferState = outputBuffer.getState();
if (!bufferState.isTerminal()) {
taskStateMachine.transitionToFlushing();
return;
}

// Cool! All done!
taskStateMachine.finished();
if (bufferState == BufferState.FINISHED) {
// Cool! All done!
taskStateMachine.finished();
return;
}

if (bufferState == BufferState.FAILED) {
Throwable failureCause = outputBuffer.getFailureCause()
.orElseGet(() -> new TrinoException(GENERIC_INTERNAL_ERROR, "Output buffer is failed but the failure cause is missing"));
taskStateMachine.failed(failureCause);
return;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about ABORTED why is it not expected here? Worth a comment?


// The only terminal state that remains is ABORTED.
// Buffer is expected to be aborted only if the task itself is aborted. In this scenario the following statement is expected to be noop.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this scenario the following statement is expected to be noop.

why? because task is aborted so this line should never execute?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing an aborted task is a noop, as the ABORTED state is a terminal state.

taskStateMachine.failed(new TrinoException(GENERIC_INTERNAL_ERROR, "Unexpected buffer state: " + bufferState));
}

@Override
Expand Down Expand Up @@ -1111,7 +1127,7 @@ public CheckTaskCompletionOnBufferFinish(SqlTaskExecution sqlTaskExecution)
@Override
public void stateChanged(BufferState newState)
{
if (newState == BufferState.FINISHED) {
if (newState.isTerminal()) {
SqlTaskExecution sqlTaskExecution = sqlTaskExecutionReference.get();
if (sqlTaskExecution != null) {
sqlTaskExecution.checkTaskCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,12 @@ public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long
}

@Override
public TaskInfo abortTaskResults(TaskId taskId, OutputBufferId bufferId)
public TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");

return tasks.getUnchecked(taskId).abortTaskResults(bufferId);
return tasks.getUnchecked(taskId).destroyTaskResults(bufferId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ TaskInfo updateTask(
* NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried.
*/
TaskInfo abortTaskResults(TaskId taskId, OutputBufferId bufferId);
TaskInfo destroyTaskResults(TaskId taskId, OutputBufferId bufferId);

/**
* Adds a state change listener to the specified task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.buffer.ClientBuffer.PagesSupplier;
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;
Expand All @@ -45,12 +44,9 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.execution.buffer.BufferState.FAILED;
import static io.trino.execution.buffer.BufferState.FINISHED;
import static io.trino.execution.buffer.BufferState.FLUSHING;
import static io.trino.execution.buffer.BufferState.NO_MORE_BUFFERS;
import static io.trino.execution.buffer.BufferState.NO_MORE_PAGES;
import static io.trino.execution.buffer.BufferState.OPEN;
import static io.trino.execution.buffer.OutputBuffers.BufferType.ARBITRARY;
import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.trino.execution.buffer.PagesSerde.getSerializedPagePositionCount;
Expand All @@ -77,21 +73,21 @@ public class ArbitraryOutputBuffer
// The index of the first client buffer that should be polled
private final AtomicInteger nextClientBufferIndex = new AtomicInteger(0);

private final StateMachine<BufferState> state;
private final OutputBufferStateMachine stateMachine;
Comment thread
arhimondr marked this conversation as resolved.
Outdated
private final String taskInstanceId;

private final AtomicLong totalPagesAdded = new AtomicLong();
private final AtomicLong totalRowsAdded = new AtomicLong();

public ArbitraryOutputBuffer(
String taskInstanceId,
StateMachine<BufferState> state,
OutputBufferStateMachine stateMachine,
DataSize maxBufferSize,
Supplier<LocalMemoryContext> memoryContextSupplier,
Executor notificationExecutor)
{
this.taskInstanceId = requireNonNull(taskInstanceId, "taskInstanceId is null");
this.state = requireNonNull(state, "state is null");
this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
requireNonNull(maxBufferSize, "maxBufferSize is null");
checkArgument(maxBufferSize.toBytes() > 0, "maxBufferSize must be at least 1");
this.memoryManager = new OutputBufferMemoryManager(
Expand All @@ -105,13 +101,7 @@ public ArbitraryOutputBuffer(
@Override
public void addStateChangeListener(StateChangeListener<BufferState> stateChangeListener)
{
state.addStateChangeListener(stateChangeListener);
}

@Override
public boolean isFinished()
Comment thread
sopel39 marked this conversation as resolved.
Outdated
{
return state.get() == FINISHED;
stateMachine.addStateChangeListener(stateChangeListener);
}

@Override
Expand All @@ -123,7 +113,7 @@ public double getUtilization()
@Override
public boolean isOverutilized()
{
return (memoryManager.getUtilization() >= 0.5) || !state.get().canAddPages();
return (memoryManager.getUtilization() >= 0.5) || !stateMachine.getState().canAddPages();
Comment thread
sopel39 marked this conversation as resolved.
Outdated
}

@Override
Expand All @@ -134,7 +124,7 @@ public OutputBufferInfo getInfo()
//

// always get the state first before any other stats
BufferState state = this.state.get();
BufferState state = stateMachine.getState();

// buffers it a concurrent collection so it is safe to access out side of guard
// in this case we only want a snapshot of the current buffers
Expand Down Expand Up @@ -163,6 +153,12 @@ public OutputBufferInfo getInfo()
infos.build());
}

@Override
public BufferState getState()
{
return stateMachine.getState();
}

@Override
public void setOutputBuffers(OutputBuffers newOutputBuffers)
{
Expand All @@ -172,7 +168,7 @@ public void setOutputBuffers(OutputBuffers newOutputBuffers)
synchronized (this) {
// ignore buffers added after query finishes, which can happen when a query is canceled
// also ignore old versions, which is normal
BufferState state = this.state.get();
BufferState state = stateMachine.getState();
if (state.isTerminal() || outputBuffers.getVersion() >= newOutputBuffers.getVersion()) {
return;
}
Expand All @@ -190,12 +186,11 @@ public void setOutputBuffers(OutputBuffers newOutputBuffers)

// update state if no more buffers is set
if (outputBuffers.isNoMoreBufferIds()) {
this.state.compareAndSet(OPEN, NO_MORE_BUFFERS);
this.state.compareAndSet(NO_MORE_PAGES, FLUSHING);
stateMachine.noMoreBuffers();
}
}

if (!state.get().canAddBuffers()) {
if (!stateMachine.getState().canAddBuffers()) {
noMoreBuffers();
}

Expand All @@ -216,7 +211,7 @@ public void enqueue(List<Slice> pages)

// ignore pages after "no more pages" is set
// this can happen with a limit query
if (!state.get().canAddPages()) {
if (!stateMachine.getState().canAddPages()) {
return;
}

Expand Down Expand Up @@ -287,9 +282,9 @@ public void acknowledge(OutputBufferId bufferId, long sequenceId)
}

@Override
public void abort(OutputBufferId bufferId)
public void destroy(OutputBufferId bufferId)
{
checkState(!Thread.holdsLock(this), "Cannot abort while holding a lock on this");
checkState(!Thread.holdsLock(this), "Cannot destroy while holding a lock on this");
requireNonNull(bufferId, "bufferId is null");

getBuffer(bufferId).destroy();
Expand All @@ -301,8 +296,7 @@ public void abort(OutputBufferId bufferId)
public void setNoMorePages()
{
checkState(!Thread.holdsLock(this), "Cannot set no more pages while holding a lock on this");
state.compareAndSet(OPEN, NO_MORE_PAGES);
state.compareAndSet(NO_MORE_BUFFERS, FLUSHING);
stateMachine.noMorePages();
memoryManager.setNoBlockOnFull();

masterBuffer.setNoMorePages();
Expand All @@ -321,7 +315,7 @@ public void destroy()
checkState(!Thread.holdsLock(this), "Cannot destroy while holding a lock on this");

// ignore destroy if the buffer already in a terminal state.
if (state.setIf(FINISHED, oldState -> !oldState.isTerminal())) {
if (stateMachine.finish()) {
noMoreBuffers();

masterBuffer.destroy();
Expand All @@ -334,10 +328,10 @@ public void destroy()
}

@Override
public void fail()
public void abort()
{
// ignore fail if the buffer already in a terminal state.
if (state.setIf(FAILED, oldState -> !oldState.isTerminal())) {
// ignore abort if the buffer already in a terminal state.
if (stateMachine.abort()) {
memoryManager.setNoBlockOnFull();
forceFreeMemory();
// DO NOT destroy buffers or set no more pages. The coordinator manages the teardown of failed queries.
Expand All @@ -350,6 +344,12 @@ public long getPeakMemoryUsage()
return memoryManager.getPeakMemoryUsage();
}

@Override
public Optional<Throwable> getFailureCause()
{
return stateMachine.getFailureCause();
}

@VisibleForTesting
void forceFreeMemory()
{
Expand All @@ -366,14 +366,14 @@ private synchronized ClientBuffer getBuffer(OutputBufferId id)
// NOTE: buffers are allowed to be created in the FINISHED state because destroy() can move to the finished state
// without a clean "no-more-buffers" message from the scheduler. This happens with limit queries and is ok because
// the buffer will be immediately destroyed.
checkState(state.get().canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set");
checkState(stateMachine.getState().canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set");

// NOTE: buffers are allowed to be created before they are explicitly declared by setOutputBuffers
// When no-more-buffers is set, we verify that all created buffers have been declared
buffer = new ClientBuffer(taskInstanceId, id, onPagesReleased);

// buffer may have finished immediately before calling this method
if (state.get() == FINISHED) {
if (stateMachine.getState() == FINISHED) {
buffer.destroy();
}

Expand All @@ -400,7 +400,7 @@ private void checkFlushComplete()
// This buffer type assigns each page to a single, arbitrary reader,
// so we don't need to wait for no-more-buffers to finish the buffer.
// Any readers added after finish will simply receive no data.
BufferState state = this.state.get();
BufferState state = stateMachine.getState();
if ((state == FLUSHING) || ((state == NO_MORE_PAGES) && masterBuffer.isEmpty())) {
if (safeGetBuffersSnapshot().stream().allMatch(ClientBuffer::isDestroyed)) {
destroy();
Expand Down
Loading