Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -209,9 +210,9 @@ public void close()
public static class ArbitraryDistributionTaskSource
implements TaskSource
{
private final Map<PlanFragmentId, PlanNodeId> sourceFragmentToRemoteSourceNodeIdMap;
private final Map<PlanFragmentId, Exchange> sourceExchanges;
private final Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles;
private final IdentityHashMap<ExchangeSourceHandle, Exchange> sourceExchanges;
private final Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles;
private final Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles;
private final long targetPartitionSizeInBytes;

private boolean finished;
Expand All @@ -223,33 +224,33 @@ public static ArbitraryDistributionTaskSource create(
DataSize targetPartitionSize)
{
checkArgument(fragment.getPartitionedSources().isEmpty(), "no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources());
checkArgument(fragment.getRemoteSourceNodes().stream().noneMatch(node -> node.getExchangeType() == REPLICATE), "replicated exchanges are not expected in source distributed stage, got: %s", fragment.getRemoteSourceNodes());
IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandleMap = getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles);

return new ArbitraryDistributionTaskSource(
getSourceFragmentToRemoteSourceNodeIdMap(fragment.getRemoteSourceNodes()),
sourceExchanges,
exchangeSourceHandles,
exchangeForHandleMap,
getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles),
getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles),
targetPartitionSize);
}

public ArbitraryDistributionTaskSource(
Map<PlanFragmentId, PlanNodeId> sourceFragmentToRemoteSourceNodeIdMap,
Map<PlanFragmentId, Exchange> sourceExchanges,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles,
IdentityHashMap<ExchangeSourceHandle, Exchange> sourceExchanges,
Multimap<PlanNodeId, ExchangeSourceHandle> partitionedExchangeSourceHandles,
Multimap<PlanNodeId, ExchangeSourceHandle> replicatedExchangeSourceHandles,
DataSize targetPartitionSize)
{
this.sourceFragmentToRemoteSourceNodeIdMap = ImmutableMap.copyOf(requireNonNull(sourceFragmentToRemoteSourceNodeIdMap, "sourceFragmentToRemoteSourceNodeIdMap is null"));
this.sourceExchanges = ImmutableMap.copyOf(requireNonNull(sourceExchanges, "sourceExchanges is null"));
this.sourceExchanges = new IdentityHashMap<>(requireNonNull(sourceExchanges, "sourceExchanges is null"));
this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null"));
this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null"));
checkArgument(
sourceFragmentToRemoteSourceNodeIdMap.keySet().equals(sourceExchanges.keySet()),
"sourceFragmentToRemoteSourceNodeIdMap and sourceExchanges are expected to have the same set of keys: %s != %s",
sourceFragmentToRemoteSourceNodeIdMap.keySet(),
sourceExchanges.keySet().containsAll(partitionedExchangeSourceHandles.values()),
"Unexpected entries in partitionedExchangeSourceHandles map: %s; allowed keys: %s",
partitionedExchangeSourceHandles.values(),
sourceExchanges.keySet());
this.exchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null"));
checkArgument(
sourceExchanges.keySet().containsAll(exchangeSourceHandles.keySet()),
"Unexpected keys in exchangeSourceHandles map: %s; allowed keys: %s",
exchangeSourceHandles.keySet(),
sourceExchanges.keySet().containsAll(replicatedExchangeSourceHandles.values()),
"Unexpected entries in replicatedExchangeSourceHandles map: %s; allowed keys: %s",
replicatedExchangeSourceHandles.values(),
sourceExchanges.keySet());
this.targetPartitionSizeInBytes = requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes();
}
Expand All @@ -266,11 +267,10 @@ public List<TaskDescriptor> getMoreTasks()
long assignedExchangeDataSize = 0;
int assignedExchangeSourceHandleCount = 0;

for (Map.Entry<PlanFragmentId, ExchangeSourceHandle> entry : exchangeSourceHandles.entries()) {
PlanFragmentId sourceFragmentId = entry.getKey();
PlanNodeId remoteSourcePlanNodeId = sourceFragmentToRemoteSourceNodeIdMap.get(sourceFragmentId);
for (Map.Entry<PlanNodeId, ExchangeSourceHandle> entry : partitionedExchangeSourceHandles.entries()) {
PlanNodeId remoteSourcePlanNodeId = entry.getKey();
ExchangeSourceHandle originalExchangeSourceHandle = entry.getValue();
Exchange sourceExchange = sourceExchanges.get(sourceFragmentId);
Exchange sourceExchange = sourceExchanges.get(originalExchangeSourceHandle);

ExchangeSourceSplitter splitter = sourceExchange.split(originalExchangeSourceHandle, targetPartitionSizeInBytes);
ImmutableList.Builder<ExchangeSourceHandle> sourceHandles = ImmutableList.builder();
Expand All @@ -286,6 +286,7 @@ public List<TaskDescriptor> getMoreTasks()
for (ExchangeSourceHandle handle : sourceHandles.build()) {
ExchangeSourceStatistics statistics = sourceExchange.getExchangeSourceStatistics(handle);
if (assignedExchangeDataSize != 0 && assignedExchangeDataSize + statistics.getSizeInBytes() > targetPartitionSizeInBytes) {
assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles);
result.add(new TaskDescriptor(currentPartitionId++, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements));
assignedExchangeSourceHandles = ImmutableListMultimap.builder();
assignedExchangeDataSize = 0;
Expand All @@ -299,6 +300,7 @@ public List<TaskDescriptor> getMoreTasks()
}

if (assignedExchangeSourceHandleCount > 0) {
assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles);
result.add(new TaskDescriptor(currentPartitionId, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements));
}

Expand Down Expand Up @@ -704,6 +706,21 @@ public void close()
}
}

private static IdentityHashMap<ExchangeSourceHandle, Exchange> getExchangeForHandleMap(
Map<PlanFragmentId, Exchange> sourceExchanges,
Multimap<PlanFragmentId, ExchangeSourceHandle> exchangeSourceHandles)
{
IdentityHashMap<ExchangeSourceHandle, Exchange> exchangeForHandle = new IdentityHashMap<>();
for (Map.Entry<PlanFragmentId, ExchangeSourceHandle> entry : exchangeSourceHandles.entries()) {
PlanFragmentId fragmentId = entry.getKey();
ExchangeSourceHandle handle = entry.getValue();
Exchange exchange = sourceExchanges.get(fragmentId);
requireNonNull(exchange, "Exchange not found for fragment " + fragmentId);
exchangeForHandle.put(handle, exchange);
}
return exchangeForHandle;
}

private static ListMultimap<PlanNodeId, ExchangeSourceHandle> getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap<PlanFragmentId, ExchangeSourceHandle> handles)
{
return getInputsForRemoteSources(
Expand Down
Loading