Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import io.trino.spi.QueryId;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -116,29 +113,22 @@ public void addInput(ExchangeInput input)
return;
}
ExchangeDataSource dataSource = delegate.get();
boolean inputAdded = false;
if (dataSource == null) {
if (input instanceof DirectExchangeInput) {
DirectExchangeClient client = directExchangeClientSupplier.get(queryId, exchangeId, systemMemoryContext, taskFailureListener, retryPolicy);
dataSource = new DirectExchangeDataSource(client);
}
else if (input instanceof SpoolingExchangeInput) {
SpoolingExchangeInput spoolingExchangeInput = (SpoolingExchangeInput) input;
ExchangeManager exchangeManager = exchangeManagerRegistry.getExchangeManager();
List<ExchangeSourceHandle> sourceHandles = spoolingExchangeInput.getExchangeSourceHandles();
ExchangeSource exchangeSource = exchangeManager.createSource(sourceHandles);
dataSource = new SpoolingExchangeDataSource(exchangeSource, systemMemoryContext);
inputAdded = true;
dataSource = new SpoolingExchangeDataSource(exchangeManager.createSource(), systemMemoryContext);
}
else {
throw new IllegalArgumentException("Unexpected input: " + input);
}
delegate.set(dataSource);
initialized = true;
}
if (!inputAdded) {
dataSource.addInput(input);
}
dataSource.addInput(input);
}

if (initialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,22 @@ public ListenableFuture<Void> isBlocked()
@Override
public void addInput(ExchangeInput input)
{
throw new UnsupportedOperationException("only a single input is expected");
SpoolingExchangeInput spoolingExchangeInput = (SpoolingExchangeInput) input;
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return;
}
exchangeSource.addSourceHandles(spoolingExchangeInput.getExchangeSourceHandles());
}

@Override
public void noMoreInputs()
{
// Only a single input is expected when the spooling exchange is used.
// Thus the assumption of "noMoreSplit" is made on construction.
ExchangeSource exchangeSource = this.exchangeSource;
if (exchangeSource == null) {
return;
}
exchangeSource.noMoreSourceHandles();
}

@Override
Expand Down
10 changes: 3 additions & 7 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,6 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks)
});
}

public boolean isOutputBufferOverutilized()
{
return outputBuffer.isOverutilized();
}

public SqlTaskIoStats getIoStats()
{
return taskHolderReference.get().getIoStats();
Expand Down Expand Up @@ -322,7 +317,8 @@ else if (taskHolder.getTaskExecution() != null) {
dynamicFiltersVersion = taskContext.getDynamicFiltersVersion();
}

return new TaskStatus(taskStateMachine.getTaskId(),
return new TaskStatus(
taskStateMachine.getTaskId(),
taskInstanceId,
versionNumber,
state,
Expand All @@ -331,7 +327,7 @@ else if (taskHolder.getTaskExecution() != null) {
failures,
queuedPartitionedDrivers,
runningPartitionedDrivers,
isOutputBufferOverutilized(),
outputBuffer.getStatus(),
physicalWrittenDataSize,
userMemoryReservation,
peakUserMemoryReservation,
Expand Down
15 changes: 8 additions & 7 deletions core/trino-main/src/main/java/io/trino/execution/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.execution.buffer.OutputBufferStatus;

import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class TaskStatus
private final long queuedPartitionedSplitsWeight;
private final int runningPartitionedDrivers;
private final long runningPartitionedSplitsWeight;
private final boolean outputBufferOverutilized;
private final OutputBufferStatus outputBufferStatus;
private final DataSize physicalWrittenDataSize;
private final DataSize memoryReservation;
private final DataSize peakMemoryReservation;
Expand All @@ -79,7 +80,7 @@ public TaskStatus(
@JsonProperty("failures") List<ExecutionFailureInfo> failures,
@JsonProperty("queuedPartitionedDrivers") int queuedPartitionedDrivers,
@JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers,
@JsonProperty("outputBufferOverutilized") boolean outputBufferOverutilized,
@JsonProperty("outputBufferStatus") OutputBufferStatus outputBufferStatus,
@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,
@JsonProperty("memoryReservation") DataSize memoryReservation,
@JsonProperty("peakMemoryReservation") DataSize peakMemoryReservation,
Expand Down Expand Up @@ -109,7 +110,7 @@ public TaskStatus(
checkArgument(runningPartitionedSplitsWeight >= 0, "runningPartitionedSplitsWeight must be positive");
this.runningPartitionedSplitsWeight = runningPartitionedSplitsWeight;

this.outputBufferOverutilized = outputBufferOverutilized;
this.outputBufferStatus = requireNonNull(outputBufferStatus, "outputBufferStatus is null");

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");

Expand Down Expand Up @@ -186,9 +187,9 @@ public DataSize getPhysicalWrittenDataSize()
}

@JsonProperty
public boolean isOutputBufferOverutilized()
public OutputBufferStatus getOutputBufferStatus()
{
return outputBufferOverutilized;
return outputBufferStatus;
}

@JsonProperty
Expand Down Expand Up @@ -260,7 +261,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n
ImmutableList.of(),
0,
0,
false,
OutputBufferStatus.initial(),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
Expand All @@ -284,7 +285,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List<E
exceptions,
taskStatus.getQueuedPartitionedDrivers(),
taskStatus.getRunningPartitionedDrivers(),
taskStatus.isOutputBufferOverutilized(),
taskStatus.getOutputBufferStatus(),
taskStatus.getPhysicalWrittenDataSize(),
taskStatus.getMemoryReservation(),
taskStatus.getPeakMemoryReservation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class ArbitraryOutputBuffer
private final PagesReleasedListener onPagesReleased;

@GuardedBy("this")
private OutputBuffers outputBuffers = createInitialEmptyOutputBuffers(ARBITRARY);
private volatile OutputBuffers outputBuffers = createInitialEmptyOutputBuffers(ARBITRARY);

private final MasterBuffer masterBuffer;

Expand Down Expand Up @@ -112,9 +112,12 @@ public double getUtilization()
}

@Override
public boolean isOverutilized()
public OutputBufferStatus getStatus()
{
return (memoryManager.getUtilization() >= 0.5) || !stateMachine.getState().canAddPages();
// do not grab lock to acquire outputBuffers to avoid delaying TaskStatus response
return OutputBufferStatus.builder(outputBuffers.getVersion())
.setOverutilized(memoryManager.getUtilization() >= 0.5 || !stateMachine.getState().canAddPages())
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class BroadcastOutputBuffer
private final PagesReleasedListener onPagesReleased;

@GuardedBy("this")
private OutputBuffers outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(BROADCAST);
private volatile OutputBuffers outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(BROADCAST);

@GuardedBy("this")
private final Map<OutputBufferId, ClientBuffer> buffers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -112,9 +112,12 @@ public double getUtilization()
}

@Override
public boolean isOverutilized()
public OutputBufferStatus getStatus()
{
return (getUtilization() > 0.5) && stateMachine.getState().canAddPages();
// do not grab lock to acquire outputBuffers to avoid delaying TaskStatus response
return OutputBufferStatus.builder(outputBuffers.getVersion())
.setOverutilized(getUtilization() > 0.5 && stateMachine.getState().canAddPages())
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ public double getUtilization()
}

@Override
public boolean isOverutilized()
public OutputBufferStatus getStatus()
{
OutputBuffer outputBuffer = getDelegateOutputBuffer();

// until output buffer is initialized, readers cannot enqueue and thus cannot be blocked
return (outputBuffer != null) && outputBuffer.isOverutilized();
if (outputBuffer == null) {
return OutputBufferStatus.initial();
}
return outputBuffer.getStatus();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public interface OutputBuffer
double getUtilization();

/**
* Check if the buffer is blocking producers.
* Get buffer status
*/
boolean isOverutilized();
OutputBufferStatus getStatus();

/**
* Add a listener which fires anytime the buffer state changes.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.buffer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

public class OutputBufferStatus
{
private static final OutputBufferStatus INITIAL = new OutputBufferStatus(OptionalLong.empty(), false, false);

private final OptionalLong outputBuffersVersion;
private final boolean overutilized;
private final boolean exchangeSinkInstanceHandleUpdateRequired;

@JsonCreator
public OutputBufferStatus(
@JsonProperty("outputBuffersVersion") OptionalLong outputBuffersVersion,
@JsonProperty("overutilized") boolean overutilized,
@JsonProperty("exchangeSinkInstanceHandleUpdateRequired") boolean exchangeSinkInstanceHandleUpdateRequired)
{
this.outputBuffersVersion = requireNonNull(outputBuffersVersion, "outputBuffersVersion is null");
this.overutilized = overutilized;
this.exchangeSinkInstanceHandleUpdateRequired = exchangeSinkInstanceHandleUpdateRequired;
}

@JsonProperty
public OptionalLong getOutputBuffersVersion()
{
return outputBuffersVersion;
}

@JsonProperty
public boolean isOverutilized()
{
return overutilized;
}

@JsonProperty
public boolean isExchangeSinkInstanceHandleUpdateRequired()
{
return exchangeSinkInstanceHandleUpdateRequired;
}

public static OutputBufferStatus initial()
{
return INITIAL;
}

public static Builder builder(long outputBuffersVersion)
{
return new Builder(outputBuffersVersion);
}

public static class Builder
{
private final OptionalLong outputBuffersVersion;
private boolean overutilized;
private boolean exchangeSinkInstanceHandleUpdateRequired;

public Builder(long outputBuffersVersion)
{
this.outputBuffersVersion = OptionalLong.of(outputBuffersVersion);
}

public Builder setOverutilized(boolean overutilized)
{
this.overutilized = overutilized;
return this;
}

public Builder setExchangeSinkInstanceHandleUpdateRequired(boolean exchangeSinkInstanceHandleUpdateRequired)
{
this.exchangeSinkInstanceHandleUpdateRequired = exchangeSinkInstanceHandleUpdateRequired;
return this;
}

public OutputBufferStatus build()
{
return new OutputBufferStatus(outputBuffersVersion, overutilized, exchangeSinkInstanceHandleUpdateRequired);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ public double getUtilization()
}

@Override
public boolean isOverutilized()
public OutputBufferStatus getStatus()
{
return memoryManager.isOverutilized();
return OutputBufferStatus.builder(outputBuffers.getVersion())
.setOverutilized(memoryManager.isOverutilized())
.build();
}

@Override
Expand Down
Loading