Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static SqlStage createSqlStage(
{
requireNonNull(stageId, "stageId is null");
requireNonNull(fragment, "fragment is null");
checkArgument(fragment.getPartitioningScheme().getBucketToPartition().isEmpty(), "bucket to partition is not expected to be set at this point");
checkArgument(fragment.getOutputPartitioningScheme().getBucketToPartition().isEmpty(), "bucket to partition is not expected to be set at this point");
requireNonNull(tables, "tables is null");
requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
requireNonNull(session, "session is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public SqlTaskExecution create(
taskContext,
fragment.getRoot(),
TypeProvider.copyOf(fragment.getSymbols()),
fragment.getPartitioningScheme(),
fragment.getOutputPartitioningScheme(),
fragment.getPartitionedSources(),
outputBuffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, int sch
stage::recordGetSplitTime,
outputDataSizeEstimates.buildOrThrow()));

FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get(fragment.getPartitioningScheme().getPartitioning().getHandle());
FaultTolerantPartitioningScheme sinkPartitioningScheme = partitioningSchemeFactory.get(fragment.getOutputPartitioningScheme().getPartitioning().getHandle());
ExchangeContext exchangeContext = new ExchangeContext(queryStateMachine.getQueryId(), new ExchangeId("external-exchange-" + stage.getStageId().getId()));

boolean preserveOrderWithinPartition = rootFragment && stage.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private static Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBuf

private static PipelinedOutputBufferManager createSingleStreamOutputBuffer(SqlStage stage)
{
PartitioningHandle partitioningHandle = stage.getFragment().getPartitioningScheme().getPartitioning().getHandle();
PartitioningHandle partitioningHandle = stage.getFragment().getOutputPartitioningScheme().getPartitioning().getHandle();
checkArgument(partitioningHandle.isSingleNode(), "partitioning is expected to be single node: " + partitioningHandle);
return new PartitionedPipelinedOutputBufferManager(partitioningHandle, 1);
}
Expand Down Expand Up @@ -946,7 +946,7 @@ private static Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionMap(
partitioningCache,
fragment.getRoot(),
fragment.getRemoteSourceNodes(),
fragment.getPartitioningScheme().getPartitionCount());
fragment.getPartitionCount());
for (SqlStage childStage : stageManager.getChildren(stage.getStageId())) {
result.put(childStage.getFragment().getId(), bucketToPartition);
}
Expand Down Expand Up @@ -989,7 +989,7 @@ private static Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBuf
for (SqlStage parentStage : stageManager.getDistributedStagesInTopologicalOrder()) {
for (SqlStage childStage : stageManager.getChildren(parentStage.getStageId())) {
PlanFragmentId fragmentId = childStage.getFragment().getId();
PartitioningHandle partitioningHandle = childStage.getFragment().getPartitioningScheme().getPartitioning().getHandle();
PartitioningHandle partitioningHandle = childStage.getFragment().getOutputPartitioningScheme().getPartitioning().getHandle();

PipelinedOutputBufferManager outputBufferManager;
if (partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION)) {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ private static StageScheduler createStageScheduler(
Session session = queryStateMachine.getSession();
PlanFragment fragment = stageExecution.getFragment();
PartitioningHandle partitioningHandle = fragment.getPartitioning();
Optional<Integer> partitionCount = fragment.getPartitioningScheme().getPartitionCount();
Optional<Integer> partitionCount = fragment.getPartitionCount();
Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);
if (!splitSources.isEmpty()) {
queryStateMachine.addStateChangeListener(new StateChangeListener<>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ public class PlanFragment
private final PlanNode root;
private final Map<Symbol, Type> symbols;
private final PartitioningHandle partitioning;
private final Optional<Integer> partitionCount;
private final List<PlanNodeId> partitionedSources;
private final Set<PlanNodeId> partitionedSourcesSet;
private final List<Type> types;
private final Set<PlanNode> partitionedSourceNodes;
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final PartitioningScheme outputPartitioningScheme;
private final StatsAndCosts statsAndCosts;
private final List<CatalogProperties> activeCatalogs;
private final Optional<String> jsonRepresentation;
Expand All @@ -60,25 +61,27 @@ private PlanFragment(
PlanNode root,
Map<Symbol, Type> symbols,
PartitioningHandle partitioning,
Optional<Integer> partitionCount,
List<PlanNodeId> partitionedSources,
Set<PlanNodeId> partitionedSourcesSet,
List<Type> types,
Set<PlanNode> partitionedSourceNodes,
List<RemoteSourceNode> remoteSourceNodes,
PartitioningScheme partitioningScheme,
PartitioningScheme outputPartitioningScheme,
StatsAndCosts statsAndCosts,
List<CatalogProperties> activeCatalogs)
{
this.id = requireNonNull(id, "id is null");
this.root = requireNonNull(root, "root is null");
this.symbols = requireNonNull(symbols, "symbols is null");
this.partitioning = requireNonNull(partitioning, "partitioning is null");
this.partitionCount = requireNonNull(partitionCount, "partitionCount is null");
this.partitionedSources = requireNonNull(partitionedSources, "partitionedSources is null");
this.partitionedSourcesSet = requireNonNull(partitionedSourcesSet, "partitionedSourcesSet is null");
this.types = requireNonNull(types, "types is null");
this.partitionedSourceNodes = requireNonNull(partitionedSourceNodes, "partitionedSourceNodes is null");
this.remoteSourceNodes = requireNonNull(remoteSourceNodes, "remoteSourceNodes is null");
this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null");
this.outputPartitioningScheme = requireNonNull(outputPartitioningScheme, "outputPartitioningScheme is null");
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null");
this.activeCatalogs = requireNonNull(activeCatalogs, "activeCatalogs is null");
this.jsonRepresentation = Optional.empty();
Expand All @@ -90,8 +93,9 @@ public PlanFragment(
@JsonProperty("root") PlanNode root,
@JsonProperty("symbols") Map<Symbol, Type> symbols,
@JsonProperty("partitioning") PartitioningHandle partitioning,
@JsonProperty("partitionCount") Optional<Integer> partitionCount,
@JsonProperty("partitionedSources") List<PlanNodeId> partitionedSources,
@JsonProperty("partitioningScheme") PartitioningScheme partitioningScheme,
@JsonProperty("outputPartitioningScheme") PartitioningScheme outputPartitioningScheme,
@JsonProperty("statsAndCosts") StatsAndCosts statsAndCosts,
@JsonProperty("activeCatalogs") List<CatalogProperties> activeCatalogs,
@JsonProperty("jsonRepresentation") Optional<String> jsonRepresentation)
Expand All @@ -100,17 +104,22 @@ public PlanFragment(
this.root = requireNonNull(root, "root is null");
this.symbols = requireNonNull(symbols, "symbols is null");
this.partitioning = requireNonNull(partitioning, "partitioning is null");
this.partitionCount = requireNonNull(partitionCount, "partitionCount is null");
this.partitionedSources = ImmutableList.copyOf(requireNonNull(partitionedSources, "partitionedSources is null"));
this.partitionedSourcesSet = ImmutableSet.copyOf(partitionedSources);
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null");
this.activeCatalogs = requireNonNull(activeCatalogs, "activeCatalogs is null");
this.jsonRepresentation = requireNonNull(jsonRepresentation, "jsonRepresentation is null");

checkArgument(
partitionCount.isEmpty() || partitioning.getConnectorHandle() instanceof SystemPartitioningHandle,
"Connector partitioning handle should be of type system partitioning when partitionCount is present");

checkArgument(partitionedSourcesSet.size() == partitionedSources.size(), "partitionedSources contains duplicates");
checkArgument(ImmutableSet.copyOf(root.getOutputSymbols()).containsAll(partitioningScheme.getOutputLayout()),
"Root node outputs (%s) does not include all fragment outputs (%s)", root.getOutputSymbols(), partitioningScheme.getOutputLayout());
checkArgument(ImmutableSet.copyOf(root.getOutputSymbols()).containsAll(outputPartitioningScheme.getOutputLayout()),
"Root node outputs (%s) does not include all fragment outputs (%s)", root.getOutputSymbols(), outputPartitioningScheme.getOutputLayout());

types = partitioningScheme.getOutputLayout().stream()
types = outputPartitioningScheme.getOutputLayout().stream()
.map(symbols::get)
.collect(toImmutableList());

Expand All @@ -120,7 +129,7 @@ public PlanFragment(
findRemoteSourceNodes(root, remoteSourceNodes);
this.remoteSourceNodes = remoteSourceNodes.build();

this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null");
this.outputPartitioningScheme = requireNonNull(outputPartitioningScheme, "partitioningScheme is null");
}

@JsonProperty
Expand All @@ -147,6 +156,12 @@ public PartitioningHandle getPartitioning()
return partitioning;
}

@JsonProperty
public Optional<Integer> getPartitionCount()
{
return partitionCount;
}

@JsonProperty
public List<PlanNodeId> getPartitionedSources()
{
Expand All @@ -159,9 +174,9 @@ public boolean isPartitionedSources(PlanNodeId nodeId)
}

@JsonProperty
public PartitioningScheme getPartitioningScheme()
public PartitioningScheme getOutputPartitioningScheme()
{
return partitioningScheme;
return outputPartitioningScheme;
}

@JsonProperty
Expand Down Expand Up @@ -194,12 +209,13 @@ public PlanFragment withoutEmbeddedJsonRepresentation()
this.root,
this.symbols,
this.partitioning,
this.partitionCount,
this.partitionedSources,
this.partitionedSourcesSet,
this.types,
this.partitionedSourceNodes,
this.remoteSourceNodes,
this.partitioningScheme,
this.outputPartitioningScheme,
this.statsAndCosts,
this.activeCatalogs);
}
Expand Down Expand Up @@ -255,7 +271,7 @@ private static void findRemoteSourceNodes(PlanNode node, ImmutableList.Builder<R

public PlanFragment withBucketToPartition(Optional<int[]> bucketToPartition)
{
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme.withBucketToPartition(bucketToPartition), statsAndCosts, activeCatalogs, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, partitionCount, partitionedSources, outputPartitioningScheme.withBucketToPartition(bucketToPartition), statsAndCosts, activeCatalogs, jsonRepresentation);
}

@Override
Expand All @@ -264,8 +280,9 @@ public String toString()
return toStringHelper(this)
.add("id", id)
.add("partitioning", partitioning)
.add("partitionCount", partitionCount)
.add("partitionedSource", partitionedSources)
.add("partitionFunction", partitioningScheme)
.add("outputPartitioningScheme", outputPartitioningScheme)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub
PartitioningHandleReassigner partitioningHandleReassigner = new PartitioningHandleReassigner(fragment.getPartitioning(), metadata, session);
newRoot = SimplePlanRewriter.rewriteWith(partitioningHandleReassigner, newRoot);
}
PartitioningScheme outputPartitioningScheme = fragment.getPartitioningScheme();
PartitioningScheme outputPartitioningScheme = fragment.getOutputPartitioningScheme();
Partitioning newOutputPartitioning = outputPartitioningScheme.getPartitioning();
if (outputPartitioningScheme.getPartitioning().getHandle().getCatalogHandle().isPresent()) {
// Do not replace the handle if the source's output handle is a system one, e.g. broadcast.
Expand All @@ -182,6 +182,7 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub
newRoot,
fragment.getSymbols(),
fragment.getPartitioning(),
fragment.getPartitionCount(),
fragment.getPartitionedSources(),
new PartitioningScheme(
newOutputPartitioning,
Expand Down Expand Up @@ -249,6 +250,7 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
root,
symbols,
properties.getPartitioningHandle(),
properties.getPartitionCount(),
schedulingOrder,
properties.getPartitioningScheme(),
statsAndCosts.getForSubplan(root),
Expand Down Expand Up @@ -326,21 +328,33 @@ public PlanNode visitRefreshMaterializedView(RefreshMaterializedViewNode node, R
@Override
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context)
{
node.getPartitioningScheme().ifPresent(scheme -> context.get().setDistribution(scheme.getPartitioning().getHandle(), metadata, session));
node.getPartitioningScheme().ifPresent(scheme -> context.get().setDistribution(
scheme.getPartitioning().getHandle(),
scheme.getPartitionCount(),
metadata,
session));
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitTableExecute(TableExecuteNode node, RewriteContext<FragmentProperties> context)
{
node.getPartitioningScheme().ifPresent(scheme -> context.get().setDistribution(scheme.getPartitioning().getHandle(), metadata, session));
node.getPartitioningScheme().ifPresent(scheme -> context.get().setDistribution(
scheme.getPartitioning().getHandle(),
scheme.getPartitionCount(),
metadata,
session));
return context.defaultRewrite(node, context.get());
}

@Override
public PlanNode visitMergeWriter(MergeWriterNode node, RewriteContext<FragmentProperties> context)
{
node.getPartitioningScheme().ifPresent(scheme -> context.get().setDistribution(scheme.getPartitioning().getHandle(), metadata, session));
node.getPartitioningScheme().ifPresent(scheme -> context.get().setDistribution(
scheme.getPartitioning().getHandle(),
scheme.getPartitionCount(),
metadata,
session));
return context.defaultRewrite(node, context.get());
}

Expand Down Expand Up @@ -368,7 +382,11 @@ public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProp
context.get().setSingleNodeDistribution();
}
else if (exchange.getType() == ExchangeNode.Type.REPARTITION) {
context.get().setDistribution(partitioningScheme.getPartitioning().getHandle(), metadata, session);
context.get().setDistribution(
partitioningScheme.getPartitioning().getHandle(),
partitioningScheme.getPartitionCount(),
metadata,
session);
}

ImmutableList.Builder<FragmentProperties> childrenProperties = ImmutableList.builder();
Expand Down Expand Up @@ -427,6 +445,7 @@ private static class FragmentProperties
private final PartitioningScheme partitioningScheme;

private Optional<PartitioningHandle> partitioningHandle = Optional.empty();
private Optional<Integer> partitionCount = Optional.empty();
private final Set<PlanNodeId> partitionedSources = new HashSet<>();

public FragmentProperties(PartitioningScheme partitioningScheme)
Expand Down Expand Up @@ -461,10 +480,15 @@ public FragmentProperties setSingleNodeDistribution()
return this;
}

public FragmentProperties setDistribution(PartitioningHandle distribution, Metadata metadata, Session session)
public FragmentProperties setDistribution(
PartitioningHandle distribution,
Optional<Integer> partitionCount,
Metadata metadata,
Session session)
{
if (partitioningHandle.isEmpty()) {
partitioningHandle = Optional.of(distribution);
this.partitionCount = partitionCount;
return this;
}

Expand All @@ -485,6 +509,7 @@ public FragmentProperties setDistribution(PartitioningHandle distribution, Metad

if (isCompatibleScaledWriterPartitioning(currentPartitioning, distribution)) {
this.partitioningHandle = Optional.of(distribution);
this.partitionCount = partitionCount;
return this;
}

Expand Down Expand Up @@ -597,6 +622,11 @@ public PartitioningHandle getPartitioningHandle()
return partitioningHandle.get();
}

public Optional<Integer> getPartitionCount()
{
return partitionCount;
}

public Set<PlanNodeId> getPartitionedSources()
{
return partitionedSources;
Expand Down
Loading