Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import io.trino.exchange.SpoolingExchangeInput;
Expand Down Expand Up @@ -131,7 +132,7 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List<Spli
partitionAssignment.getPartitionId(),
planNodeId,
false,
splits,
singleSourcePartition(SINGLE_SOURCE_PARTITION_ID, splits),
noMoreSplits));
}
if (noMoreSplits) {
Expand All @@ -157,15 +158,15 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List<Spli
0,
replicatedSourceId,
false,
replicatedSplits.get(replicatedSourceId),
singleSourcePartition(SINGLE_SOURCE_PARTITION_ID, replicatedSplits.get(replicatedSourceId)),
true));
}
for (PlanNodeId partitionedSourceId : partitionedSources) {
assignment.updatePartition(new PartitionUpdate(
0,
partitionedSourceId,
false,
ImmutableList.of(),
ImmutableListMultimap.of(),
true));
}
assignment.sealPartition(0);
Expand All @@ -179,7 +180,7 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List<Spli
partitionAssignment.getPartitionId(),
partitionedSourceNodeId,
false,
ImmutableList.of(),
singleSourcePartition(0, ImmutableList.of()),
true));
}
// seal partition
Expand All @@ -194,6 +195,13 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List<Spli
return assignment.build();
}

private ListMultimap<Integer, Split> singleSourcePartition(int sourcePartitionId, List<Split> splits)
{
ImmutableListMultimap.Builder<Integer, Split> builder = ImmutableListMultimap.builder();
builder.putAll(0, splits);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REPLICATED_SOURCE_PARTITION is also 0

document why 0 is ok here or give the constant a name

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

USed:

// marker source partition id for data which is not hash distributed
    int SINGLE_SOURCE_PARTITION_ID = 0;

return builder.build();
}

private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Split> splits, boolean noMoreSplits)
{
AssignmentResult.Builder assignment = AssignmentResult.builder();
Expand All @@ -210,7 +218,7 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Spl
partitionAssignment.getPartitionId(),
partitionedSourceNodeId,
false,
ImmutableList.of(),
ImmutableListMultimap.of(),
true));
}
if (completedSources.containsAll(replicatedSources)) {
Expand Down Expand Up @@ -241,15 +249,15 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Spl
partitionAssignment.getPartitionId(),
replicatedSourceId,
false,
replicatedSplits.get(replicatedSourceId),
singleSourcePartition(SINGLE_SOURCE_PARTITION_ID, replicatedSplits.get(replicatedSourceId)),
completedSources.contains(replicatedSourceId)));
}
}
assignment.updatePartition(new PartitionUpdate(
partitionAssignment.getPartitionId(),
planNodeId,
true,
ImmutableList.of(split),
singleSourcePartition(SINGLE_SOURCE_PARTITION_ID, ImmutableList.of(split)),
false));
partitionAssignment.assignSplit(splitSizeInBytes);
}
Expand All @@ -268,15 +276,15 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Spl
0,
replicatedSourceId,
false,
replicatedSplits.get(replicatedSourceId),
singleSourcePartition(SINGLE_SOURCE_PARTITION_ID, replicatedSplits.get(replicatedSourceId)),
true));
}
for (PlanNodeId partitionedSourceId : partitionedSources) {
assignment.updatePartition(new PartitionUpdate(
0,
partitionedSourceId,
false,
ImmutableList.of(),
ImmutableListMultimap.of(),
true));
}

Expand All @@ -290,7 +298,7 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Spl
partitionAssignment.getPartitionId(),
partitionedSourceNodeId,
false,
ImmutableList.of(),
ImmutableListMultimap.of(),
true));
}
// seal partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
Expand Down Expand Up @@ -1700,21 +1701,21 @@ public void addPartition(int partitionId, NodeRequirements nodeRequirements)
}

public Optional<PrioritizedScheduledTask> updatePartition(
int partitionId,
int taskPartitionId,
PlanNodeId planNodeId,
boolean readyForScheduling,
List<Split> splits,
ListMultimap<Integer, Split> splits, // sourcePartitionId -> splits
boolean noMoreSplits)
{
if (getState().isDone()) {
return Optional.empty();
}

StagePartition partition = getStagePartition(partitionId);
StagePartition partition = getStagePartition(taskPartitionId);
partition.addSplits(planNodeId, splits, noMoreSplits);
if (readyForScheduling && !partition.isTaskScheduled()) {
partition.setTaskScheduled(true);
return Optional.of(PrioritizedScheduledTask.createSpeculative(stage.getStageId(), partitionId, schedulingPriority, eager));
return Optional.of(PrioritizedScheduledTask.createSpeculative(stage.getStageId(), taskPartitionId, schedulingPriority, eager));
}
return Optional.empty();
}
Expand Down Expand Up @@ -1823,7 +1824,7 @@ public Optional<RemoteTask> schedule(int partitionId, ExchangeSinkInstanceHandle
Map<PlanNodeId, ExchangeSourceOutputSelector> outputSelectors = getSourceOutputSelectors();

ListMultimap<PlanNodeId, Split> splits = ArrayListMultimap.create();
splits.putAll(partition.getSplits());
splits.putAll(partition.getSplits().getSplitsFlat());
outputSelectors.forEach((planNodeId, outputSelector) -> splits.put(planNodeId, createOutputSelectorSplit(outputSelector)));

Set<PlanNodeId> noMoreSplits = new HashSet<>();
Expand Down Expand Up @@ -2014,6 +2015,11 @@ public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailure
runningPartitions.remove(partitionId);
}

if (!remainingPartitions.contains(partitionId)) {
// another task for this partition finished successfully
return ImmutableList.of();
}

RuntimeException failure = failureInfo.toException();
ErrorCode errorCode = failureInfo.getErrorCode();
partitionMemoryEstimator.registerPartitionFinished(
Expand Down Expand Up @@ -2214,7 +2220,7 @@ public StagePartition(
this.exchangeSinkHandle = requireNonNull(exchangeSinkHandle, "exchangeSinkHandle is null");
this.remoteSourceIds = ImmutableSet.copyOf(requireNonNull(remoteSourceIds, "remoteSourceIds is null"));
requireNonNull(nodeRequirements, "nodeRequirements is null");
this.openTaskDescriptor = Optional.of(new OpenTaskDescriptor(ImmutableListMultimap.of(), ImmutableSet.of(), nodeRequirements));
this.openTaskDescriptor = Optional.of(new OpenTaskDescriptor(SplitsMapping.EMPTY, ImmutableSet.of(), nodeRequirements));
this.memoryRequirements = requireNonNull(memoryRequirements, "memoryRequirements is null");
this.remainingAttempts = maxTaskExecutionAttempts;
}
Expand All @@ -2224,7 +2230,7 @@ public ExchangeSinkHandle getExchangeSinkHandle()
return exchangeSinkHandle;
}

public void addSplits(PlanNodeId planNodeId, List<Split> splits, boolean noMoreSplits)
public void addSplits(PlanNodeId planNodeId, ListMultimap<Integer, Split> splits, boolean noMoreSplits)
{
checkState(openTaskDescriptor.isPresent(), "openTaskDescriptor is empty");
openTaskDescriptor = Optional.of(openTaskDescriptor.get().update(planNodeId, splits, noMoreSplits));
Expand All @@ -2233,7 +2239,7 @@ public void addSplits(PlanNodeId planNodeId, List<Split> splits, boolean noMoreS
}
for (RemoteTask task : tasks.values()) {
task.addSplits(ImmutableListMultimap.<PlanNodeId, Split>builder()
.putAll(planNodeId, splits)
.putAll(planNodeId, splits.values())
.build());
if (noMoreSplits && isFinalOutputSelectorDelivered(planNodeId)) {
task.noMoreSplits(planNodeId);
Expand Down Expand Up @@ -2270,15 +2276,15 @@ public void seal()
}
}

public ListMultimap<PlanNodeId, Split> getSplits()
public SplitsMapping getSplits()
{
if (finished) {
return ImmutableListMultimap.of();
return SplitsMapping.EMPTY;
}
return openTaskDescriptor.map(OpenTaskDescriptor::getSplits)
.or(() -> taskDescriptorStorage.get(stageId, partitionId).map(TaskDescriptor::getSplits))
// execution is finished
.orElse(ImmutableListMultimap.of());
.orElse(SplitsMapping.EMPTY);
}

public boolean isNoMoreSplits(PlanNodeId planNodeId)
Expand Down Expand Up @@ -2436,18 +2442,25 @@ private static Split createOutputSelectorSplit(ExchangeSourceOutputSelector sele

private static class OpenTaskDescriptor
{
private final ListMultimap<PlanNodeId, Split> splits;
private final SplitsMapping splits;
private final Set<PlanNodeId> noMoreSplits;
private final NodeRequirements nodeRequirements;

private OpenTaskDescriptor(ListMultimap<PlanNodeId, Split> splits, Set<PlanNodeId> noMoreSplits, NodeRequirements nodeRequirements)
private OpenTaskDescriptor(SplitsMapping splits, Set<PlanNodeId> noMoreSplits, NodeRequirements nodeRequirements)
{
this.splits = ImmutableListMultimap.copyOf(requireNonNull(splits, "splits is null"));
this.splits = requireNonNull(splits, "splits is null");
this.noMoreSplits = ImmutableSet.copyOf(requireNonNull(noMoreSplits, "noMoreSplits is null"));
this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null");
}

public ListMultimap<PlanNodeId, Split> getSplits()
private static Map<PlanNodeId, ListMultimap<Integer, Split>> copySplits(Map<PlanNodeId, ListMultimap<Integer, Split>> splits)
{
ImmutableMap.Builder<PlanNodeId, ListMultimap<Integer, Split>> splitsBuilder = ImmutableMap.builder();
splits.forEach((planNodeId, planNodeSplits) -> splitsBuilder.put(planNodeId, ImmutableListMultimap.copyOf(planNodeSplits)));
return splitsBuilder.buildOrThrow();
}

public SplitsMapping getSplits()
{
return splits;
}
Expand All @@ -2462,12 +2475,15 @@ public NodeRequirements getNodeRequirements()
return nodeRequirements;
}

public OpenTaskDescriptor update(PlanNodeId planNodeId, List<Split> splits, boolean noMoreSplits)
public OpenTaskDescriptor update(PlanNodeId planNodeId, ListMultimap<Integer, Split> splits, boolean noMoreSplits)
{
ListMultimap<PlanNodeId, Split> updatedSplits = ImmutableListMultimap.<PlanNodeId, Split>builder()
.putAll(this.splits)
.putAll(planNodeId, splits)
.build();
SplitsMapping.Builder updatedSplitsMapping = SplitsMapping.builder(this.splits);

for (Map.Entry<Integer, List<Split>> entry : Multimaps.asMap(splits).entrySet()) {
Integer sourcePartition = entry.getKey();
List<Split> partitionSplits = entry.getValue();
updatedSplitsMapping.addSplits(planNodeId, sourcePartition, partitionSplits);
}

Set<PlanNodeId> updatedNoMoreSplits = this.noMoreSplits;
if (noMoreSplits && !updatedNoMoreSplits.contains(planNodeId)) {
Expand All @@ -2477,14 +2493,14 @@ public OpenTaskDescriptor update(PlanNodeId planNodeId, List<Split> splits, bool
.build();
}
return new OpenTaskDescriptor(
updatedSplits,
updatedSplitsMapping.build(),
updatedNoMoreSplits,
nodeRequirements);
}

public TaskDescriptor createTaskDescriptor(int partitionId)
{
Set<PlanNodeId> missingNoMoreSplits = Sets.difference(splits.keySet(), noMoreSplits);
Set<PlanNodeId> missingNoMoreSplits = Sets.difference(splits.getPlanNodeIds(), noMoreSplits);
checkState(missingNoMoreSplits.isEmpty(), "missing no more splits for plan nodes: %s", missingNoMoreSplits);
return new TaskDescriptor(
partitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
Expand Down Expand Up @@ -61,6 +62,7 @@ class HashDistributionSplitAssigner

private final Set<Integer> createdTaskPartitions = new HashSet<>();
private final Set<PlanNodeId> completedSources = new HashSet<>();

private final ListMultimap<PlanNodeId, Split> replicatedSplits = ArrayListMultimap.create();

private boolean allTaskPartitionsCreated;
Expand Down Expand Up @@ -150,7 +152,7 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Spli
if (replicatedSources.contains(planNodeId)) {
replicatedSplits.putAll(planNodeId, splits.values());
for (Integer partitionId : createdTaskPartitions) {
assignment.updatePartition(new PartitionUpdate(partitionId, planNodeId, false, ImmutableList.copyOf(splits.values()), noMoreSplits));
assignment.updatePartition(new PartitionUpdate(partitionId, planNodeId, false, replicatedSourcePartition(ImmutableList.copyOf(splits.values())), noMoreSplits));
}
}
else {
Expand All @@ -167,15 +169,16 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Spli
}

for (SubPartition subPartition : subPartitions) {
assignment.updatePartition(new PartitionUpdate(subPartition.getId(), planNodeId, true, ImmutableList.of(split), false));
// todo see if having lots of PartitionUpdates is not a problem; should we merge
assignment.updatePartition(new PartitionUpdate(subPartition.getId(), planNodeId, true, ImmutableListMultimap.of(sourcePartitionId, split), false));
}
});
}

if (noMoreSplits) {
completedSources.add(planNodeId);
for (Integer taskPartition : createdTaskPartitions) {
assignment.updatePartition(new PartitionUpdate(taskPartition, planNodeId, false, ImmutableList.of(), true));
assignment.updatePartition(new PartitionUpdate(taskPartition, planNodeId, false, ImmutableListMultimap.of(), true));
}

if (completedSources.containsAll(allSources)) {
Expand All @@ -189,6 +192,13 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Spli
return assignment.build();
}

public static ListMultimap<Integer, Split> replicatedSourcePartition(List<Split> splits)
{
ImmutableListMultimap.Builder<Integer, Split> builder = ImmutableListMultimap.builder();
builder.putAll(SINGLE_SOURCE_PARTITION_ID, splits);
return builder.build();
}

@Override
public AssignmentResult finish()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.execution.scheduler.faulttolerant;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import io.trino.metadata.Split;
Expand Down Expand Up @@ -57,15 +57,15 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap<Integer, Spli
0,
planNodeId,
true,
ImmutableList.copyOf(splits.values()),
ImmutableListMultimap.copyOf(splits),
false));
}
if (noMoreSplits) {
assignment.updatePartition(new PartitionUpdate(
0,
planNodeId,
false,
ImmutableList.of(),
ImmutableListMultimap.of(),
true));
completedSources.add(planNodeId);
}
Expand Down
Loading