Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void addInput(ExchangeInput input)
return;
}
ExchangeDataSource dataSource = delegate.get();
boolean inputAdded = false;
if (dataSource == null) {
if (input instanceof DirectExchangeInput) {
DirectExchangeClient client = directExchangeClientSupplier.get(queryId, exchangeId, systemMemoryContext, taskFailureListener, retryPolicy);
Expand All @@ -126,15 +127,18 @@ else if (input instanceof SpoolingExchangeInput) {
ExchangeManager exchangeManager = exchangeManagerRegistry.getExchangeManager();
List<ExchangeSourceHandle> sourceHandles = spoolingExchangeInput.getExchangeSourceHandles();
ExchangeSource exchangeSource = exchangeManager.createSource(sourceHandles);
dataSource = new SpoolingExchangeDataSource(exchangeSource, sourceHandles, systemMemoryContext);
dataSource = new SpoolingExchangeDataSource(exchangeSource, systemMemoryContext);
inputAdded = true;
}
else {
throw new IllegalArgumentException("Unexpected input: " + input);
}
delegate.set(dataSource);
initialized = true;
}
dataSource.addInput(input);
if (!inputAdded) {
dataSource.addInput(input);
}
}

if (initialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,13 @@
*/
package io.trino.exchange;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.OperatorInfo;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;

import java.util.List;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static java.util.Objects.requireNonNull;
Expand All @@ -39,18 +34,13 @@ public class SpoolingExchangeDataSource
// It doesn't have to be declared as volatile as the nullification of this variable doesn't have to be immediately visible to other threads.
// However since close can be called at any moment this variable has to be accessed in a safe way (avoiding "check-then-use").
private ExchangeSource exchangeSource;
private final List<ExchangeSourceHandle> exchangeSourceHandles;
private final LocalMemoryContext systemMemoryContext;
private volatile boolean closed;

public SpoolingExchangeDataSource(
ExchangeSource exchangeSource,
List<ExchangeSourceHandle> exchangeSourceHandles,
LocalMemoryContext systemMemoryContext)
public SpoolingExchangeDataSource(ExchangeSource exchangeSource, LocalMemoryContext systemMemoryContext)
{
// this assignment is expected to be followed by an assignment of a final field to ensure safe publication
this.exchangeSource = requireNonNull(exchangeSource, "exchangeSource is null");
this.exchangeSourceHandles = ImmutableList.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null"));
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
}

Expand Down Expand Up @@ -96,16 +86,7 @@ public ListenableFuture<Void> isBlocked()
@Override
public void addInput(ExchangeInput input)
{
SpoolingExchangeInput exchangeInput = (SpoolingExchangeInput) input;
// Only a single input is expected when the spooling exchange is used.
// The engine adds the same input to every instance of the ExchangeOperator.
// Since the ExchangeDataSource is shared between ExchangeOperator instances
// the same input may be delivered multiple times.
checkState(
exchangeInput.getExchangeSourceHandles().equals(exchangeSourceHandles),
"exchange input is expected to contain an identical exchangeSourceHandles list: %s != %s",
exchangeInput.getExchangeSourceHandles(),
exchangeSourceHandles);
throw new UnsupportedOperationException("only a single input is expected");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void onFailure(Throwable throwable)
}

@Override
public void addOutputInfoListener(Consumer<QueryOutputInfo> listener)
public void setOutputInfoListener(Consumer<QueryOutputInfo> listener)
{
// DDL does not have an output
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.sql.planner.Plan;

import java.util.List;
import java.util.Queue;
import java.util.function.Consumer;

import static java.util.Objects.requireNonNull;
Expand All @@ -41,7 +42,7 @@ public interface QueryExecution

void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener);

void addOutputInfoListener(Consumer<QueryOutputInfo> listener);
void setOutputInfoListener(Consumer<QueryOutputInfo> listener);

void outputTaskFailed(TaskId taskId, Throwable failure);

Expand Down Expand Up @@ -86,23 +87,23 @@ interface QueryExecutionFactory<T extends QueryExecution>
}

/**
* Output schema and buffer URIs for query. The info will always contain column names and types. Buffer locations will always
* contain the full location set, but may be empty. Users of this data should keep a private copy of the seen buffers to
* handle out of order events from the listener. Once noMoreBufferLocations is set the locations will never change, and
* it is guaranteed that all previously sent locations are contained in the buffer locations.
* The info will always contain column names and types.
* The {@code inputsQueue} is shared between {@link QueryOutputInfo} instances.
* It is guaranteed that no new entries will be added to {@code inputsQueue} after {@link QueryOutputInfo}
* with {@link #isNoMoreInputs()} {@code == true} is created.
*/
class QueryOutputInfo
{
private final List<String> columnNames;
private final List<Type> columnTypes;
private final List<ExchangeInput> inputs;
private final Queue<ExchangeInput> inputsQueue;
private final boolean noMoreInputs;

public QueryOutputInfo(List<String> columnNames, List<Type> columnTypes, List<ExchangeInput> inputs, boolean noMoreInputs)
public QueryOutputInfo(List<String> columnNames, List<Type> columnTypes, Queue<ExchangeInput> inputsQueue, boolean noMoreInputs)
{
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.inputs = ImmutableList.copyOf(requireNonNull(inputs, "inputs is null"));
this.inputsQueue = requireNonNull(inputsQueue, "inputsQueue is null");
Copy link
Copy Markdown
Member

@losipiuk losipiuk Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add:

// inputsQueue is shared between `QueryOutputInfo` instances. 
// It is guaranteed that no new entries will be added to `inputsQueue` after `QueryOutputInfo` with `noMoreInputs==true` is created.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated QueryOutputInfo java doc

this.noMoreInputs = noMoreInputs;
}

Expand All @@ -116,9 +117,15 @@ public List<Type> getColumnTypes()
return columnTypes;
}

public List<ExchangeInput> getInputs()
public void drainInputs(Consumer<ExchangeInput> consumer)
{
return inputs;
while (true) {
ExchangeInput input = inputsQueue.poll();
if (input == null) {
break;
}
consumer.accept(input);
}
}

public boolean isNoMoreInputs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface QueryManager
*
* @throws NoSuchElementException if query does not exist
*/
void addOutputInfoListener(QueryId queryId, Consumer<QueryExecution.QueryOutputInfo> listener)
void setOutputInfoListener(QueryId queryId, Consumer<QueryExecution.QueryOutputInfo> listener)
throws NoSuchElementException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -284,6 +286,7 @@ static QueryStateMachine beginWithTicker(
QUERY_STATE_LOG.debug("Query %s is %s", queryStateMachine.getQueryId(), newState);
if (newState.isDone()) {
queryStateMachine.getSession().getTransactionId().ifPresent(transactionManager::trySetInactive);
queryStateMachine.getOutputManager().setQueryCompleted();
}
});

Expand Down Expand Up @@ -711,9 +714,9 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
operatorStatsSummary.build());
}

public void addOutputInfoListener(Consumer<QueryOutputInfo> listener)
public void setOutputInfoListener(Consumer<QueryOutputInfo> listener)
{
outputManager.addOutputInfoListener(listener);
outputManager.setOutputInfoListener(listener);
}

public void addOutputTaskFailureListener(TaskFailureListener listener)
Expand Down Expand Up @@ -1282,21 +1285,28 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
ImmutableList.of()); // Remove the operator summaries as OperatorInfo (especially DirectExchangeClientStatus) can hold onto a large amount of memory
}

private QueryOutputManager getOutputManager()
{
return outputManager;
}

public static class QueryOutputManager
{
private final Executor executor;

@GuardedBy("this")
private final List<Consumer<QueryOutputInfo>> outputInfoListeners = new ArrayList<>();
private Optional<Consumer<QueryOutputInfo>> listener = Optional.empty();

@GuardedBy("this")
private List<String> columnNames;
@GuardedBy("this")
private List<Type> columnTypes;
@GuardedBy("this")
private final List<ExchangeInput> inputs = new ArrayList<>();
@GuardedBy("this")
private boolean noMoreInputs;
@GuardedBy("this")
private boolean queryCompleted;

private final Queue<ExchangeInput> inputsQueue = new ConcurrentLinkedQueue<>();

@GuardedBy("this")
private final Map<TaskId, Throwable> outputTaskFailures = new HashMap<>();
Expand All @@ -1308,16 +1318,17 @@ public QueryOutputManager(Executor executor)
this.executor = requireNonNull(executor, "executor is null");
}

public void addOutputInfoListener(Consumer<QueryOutputInfo> listener)
public void setOutputInfoListener(Consumer<QueryOutputInfo> listener)
{
requireNonNull(listener, "listener is null");

Optional<QueryOutputInfo> queryOutputInfo;
synchronized (this) {
outputInfoListeners.add(listener);
checkState(this.listener.isEmpty(), "listener is already set");
this.listener = Optional.of(listener);
queryOutputInfo = getQueryOutputInfo();
}
queryOutputInfo.ifPresent(info -> executor.execute(() -> listener.accept(info)));
fireStateChangedIfReady(queryOutputInfo, Optional.of(listener));
}

public void setColumns(List<String> columnNames, List<Type> columnTypes)
Expand All @@ -1327,33 +1338,45 @@ public void setColumns(List<String> columnNames, List<Type> columnTypes)
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must be the same size");

Optional<QueryOutputInfo> queryOutputInfo;
List<Consumer<QueryOutputInfo>> outputInfoListeners;
Optional<Consumer<QueryOutputInfo>> listener;
synchronized (this) {
checkState(this.columnNames == null && this.columnTypes == null, "output fields already set");
this.columnNames = ImmutableList.copyOf(columnNames);
this.columnTypes = ImmutableList.copyOf(columnTypes);

queryOutputInfo = getQueryOutputInfo();
outputInfoListeners = ImmutableList.copyOf(this.outputInfoListeners);
listener = this.listener;
}
queryOutputInfo.ifPresent(info -> fireStateChanged(info, outputInfoListeners));
fireStateChangedIfReady(queryOutputInfo, listener);
}

public void updateInputsForQueryResults(List<ExchangeInput> newInputs, boolean noMoreInputs)
{
requireNonNull(newInputs, "newInputs is null");

Optional<QueryOutputInfo> queryOutputInfo;
List<Consumer<QueryOutputInfo>> outputInfoListeners;
Optional<Consumer<QueryOutputInfo>> listener;
synchronized (this) {
// noMoreInputs can be set more than once
checkState(newInputs.isEmpty() || !this.noMoreInputs, "new inputs added after no more inputs set");
inputs.addAll(newInputs);
this.noMoreInputs = noMoreInputs;
if (!queryCompleted) {
// noMoreInputs can be set more than once
checkState(newInputs.isEmpty() || !this.noMoreInputs, "new inputs added after no more inputs set");
inputsQueue.addAll(newInputs);
this.noMoreInputs = noMoreInputs;
}
queryOutputInfo = getQueryOutputInfo();
outputInfoListeners = ImmutableList.copyOf(this.outputInfoListeners);
listener = this.listener;
}
fireStateChangedIfReady(queryOutputInfo, listener);
}

public synchronized void setQueryCompleted()
{
if (queryCompleted) {
return;
}
queryOutputInfo.ifPresent(info -> fireStateChanged(info, outputInfoListeners));
queryCompleted = true;
inputsQueue.clear();
noMoreInputs = true;
}

public void addOutputTaskFailureListener(TaskFailureListener listener)
Expand Down Expand Up @@ -1387,14 +1410,15 @@ private synchronized Optional<QueryOutputInfo> getQueryOutputInfo()
if (columnNames == null || columnTypes == null) {
return Optional.empty();
}
return Optional.of(new QueryOutputInfo(columnNames, columnTypes, inputs, noMoreInputs));
return Optional.of(new QueryOutputInfo(columnNames, columnTypes, inputsQueue, noMoreInputs));
}

private void fireStateChanged(QueryOutputInfo queryOutputInfo, List<Consumer<QueryOutputInfo>> outputInfoListeners)
private void fireStateChangedIfReady(Optional<QueryOutputInfo> info, Optional<Consumer<QueryOutputInfo>> listener)
{
for (Consumer<QueryOutputInfo> outputInfoListener : outputInfoListeners) {
executor.execute(() -> outputInfoListener.accept(queryOutputInfo));
if (info.isEmpty() || listener.isEmpty()) {
return;
}
executor.execute(() -> listener.get().accept(info.get()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,9 @@ public boolean isDone()
}

@Override
public void addOutputInfoListener(Consumer<QueryOutputInfo> listener)
public void setOutputInfoListener(Consumer<QueryOutputInfo> listener)
{
stateMachine.addOutputInfoListener(listener);
stateMachine.setOutputInfoListener(listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ public List<BasicQueryInfo> getQueries()
}

@Override
public void addOutputInfoListener(QueryId queryId, Consumer<QueryOutputInfo> listener)
public void setOutputInfoListener(QueryId queryId, Consumer<QueryOutputInfo> listener)
{
requireNonNull(listener, "listener is null");

queryTracker.getQuery(queryId).addOutputInfoListener(listener);
queryTracker.getQuery(queryId).setOutputInfoListener(listener);
}

@Override
Expand Down
Loading