Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -133,77 +134,97 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
// In order to preserve the fixpoint, we will "pretend" the newly added C2 table scan is part of C1's job to maintain.
for (ConnectorId connectorId : connectorIds.build()) {
Set<ConnectorPlanOptimizer> optimizers = connectorOptimizers.get(connectorId);
if (optimizers == null) {
if (optimizers == null || optimizers.isEmpty()) {
continue;
}

ImmutableMap.Builder<List<ConnectorId>, Set<ConnectorPlanOptimizer>> optimizersWithConnectorRange = ImmutableMap.builder();
List<ConnectorId> currentConnectors = null;
ImmutableSet.Builder<ConnectorPlanOptimizer> currentGroup = null;
for (ConnectorPlanOptimizer optimizer : optimizers) {
List<ConnectorId> supportedConnectors = optimizer.getSupportedConnectorIds().isEmpty()
? ImmutableList.of(connectorId)
: optimizer.getSupportedConnectorIds();

if (!supportedConnectors.equals(currentConnectors)) {
if (currentGroup != null) {
optimizersWithConnectorRange.put(currentConnectors, currentGroup.build());
}
currentConnectors = supportedConnectors;
currentGroup = ImmutableSet.builder();
}
currentGroup.add(optimizer);
}
optimizersWithConnectorRange.put(currentConnectors, currentGroup.build());

ImmutableMap.Builder<PlanNode, ConnectorPlanNodeContext> contextMapBuilder = ImmutableMap.builder();
buildConnectorPlanNodeContext(plan, null, contextMapBuilder);
Map<PlanNode, ConnectorPlanNodeContext> contextMap = contextMapBuilder.build();
for (Map.Entry<List<ConnectorId>, Set<ConnectorPlanOptimizer>> entry : optimizersWithConnectorRange.build().entrySet()) {
// keep track of changed nodes; the keys are original nodes and the values are the new nodes
Map<PlanNode, PlanNode> updates = new HashMap<>();

// process connector optimizers
for (PlanNode node : contextMap.keySet()) {
// For a subtree with root `node` to be a max closure, the following conditions must hold:
// * The subtree with root `node` is a closure.
// * `node` has no parent, or the subtree with root as `node`'s parent is not a closure.
ConnectorPlanNodeContext context = contextMap.get(node);
if (!context.isClosure(connectorId, session, entry.getKey()) ||
!context.getParent().isPresent() ||
contextMap.get(context.getParent().get()).isClosure(connectorId, session, entry.getKey())) {
Comment on lines +173 to +175
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Logic for max closure detection may be too restrictive for multi-connector cases.

This strict connector set equality may limit valid optimizations. Consider if a containment check would enable more flexible closure detection.

continue;
}

// keep track of changed nodes; the keys are original nodes and the values are the new nodes
Map<PlanNode, PlanNode> updates = new HashMap<>();

// process connector optimizers
for (PlanNode node : contextMap.keySet()) {
// For a subtree with root `node` to be a max closure, the following conditions must hold:
// * The subtree with root `node` is a closure.
// * `node` has no parent, or the subtree with root as `node`'s parent is not a closure.
ConnectorPlanNodeContext context = contextMap.get(node);
if (!context.isClosure(connectorId, session) ||
!context.getParent().isPresent() ||
contextMap.get(context.getParent().get()).isClosure(connectorId, session)) {
continue;
}

PlanNode newNode = node;
PlanNode newNode = node;

// the returned node is still a max closure (only if there is no new connector added, which does happen but ignored here)
for (ConnectorPlanOptimizer optimizer : optimizers) {
long start = System.nanoTime();
newNode = optimizer.optimize(newNode, session.toConnectorSession(connectorId), variableAllocator, idAllocator);
if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) {
session.getRuntimeStats().addMetricValue(String.format("optimizer%sTimeNanos", getOptimizerNameForLog(optimizer)), NANO, System.nanoTime() - start);
// the returned node is still a max closure (only if there is no new connector added, which does happen but ignored here)
for (ConnectorPlanOptimizer optimizer : entry.getValue()) {
long start = System.nanoTime();
newNode = optimizer.optimize(newNode, session.toConnectorSession(connectorId), variableAllocator, idAllocator);
if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) {
session.getRuntimeStats().addMetricValue(String.format("optimizer%sTimeNanos", getOptimizerNameForLog(optimizer)), NANO, System.nanoTime() - start);
}
}
}

if (node != newNode) {
// the optimizer has allocated a new PlanNode
checkState(
containsAll(ImmutableSet.copyOf(newNode.getOutputVariables()), node.getOutputVariables()),
"the connector optimizer from %s returns a node that does not cover all output before optimization",
connectorId);
if (node != newNode) {
// the optimizer has allocated a new PlanNode
checkState(
containsAll(ImmutableSet.copyOf(newNode.getOutputVariables()), node.getOutputVariables()),
"the connector optimizer from %s returns a node that does not cover all output before optimization",
connectorId);

updates.put(node, newNode);
}
}
// up to this point, we have a set of updated nodes; need to recursively update their parents

// alter the plan with a bottom-up approach (but does not have to be strict bottom-up to guarantee the correctness of the algorithm)
// use "original nodes" to keep track of the plan structure and "updates" to keep track of the new nodes
Queue<PlanNode> originalNodes = new LinkedList<>(updates.keySet());
while (!originalNodes.isEmpty()) {
PlanNode originalNode = originalNodes.poll();

if (!contextMap.get(originalNode).getParent().isPresent()) {
// originalNode must be the root; update the plan
plan = updates.get(originalNode);
continue;
updates.put(node, newNode);
}
}
// up to this point, we have a set of updated nodes; need to recursively update their parents

// alter the plan with a bottom-up approach (but does not have to be strict bottom-up to guarantee the correctness of the algorithm)
// use "original nodes" to keep track of the plan structure and "updates" to keep track of the new nodes
Queue<PlanNode> originalNodes = new LinkedList<>(updates.keySet());
while (!originalNodes.isEmpty()) {
PlanNode originalNode = originalNodes.poll();

if (!contextMap.get(originalNode).getParent().isPresent()) {
// originalNode must be the root; update the plan
plan = updates.get(originalNode);
continue;
}

PlanNode originalParent = contextMap.get(originalNode).getParent().get();
PlanNode originalParent = contextMap.get(originalNode).getParent().get();

// need to create a new parent given the child has changed; the new parent needs to point to the new child.
// if a node has been updated, it will occur in `updates`; otherwise, just use the original node
ImmutableList.Builder<PlanNode> newChildren = ImmutableList.builder();
originalParent.getSources().forEach(child -> newChildren.add(updates.getOrDefault(child, child)));
PlanNode newParent = originalParent.replaceChildren(newChildren.build());
// need to create a new parent given the child has changed; the new parent needs to point to the new child.
// if a node has been updated, it will occur in `updates`; otherwise, just use the original node
ImmutableList.Builder<PlanNode> newChildren = ImmutableList.builder();
originalParent.getSources().forEach(child -> newChildren.add(updates.getOrDefault(child, child)));
PlanNode newParent = originalParent.replaceChildren(newChildren.build());

// mark the new parent as updated
updates.put(originalParent, newParent);
// mark the new parent as updated
updates.put(originalParent, newParent);

// enqueue the parent node in order to recursively update its ancestors
originalNodes.add(originalParent);
// enqueue the parent node in order to recursively update its ancestors
originalNodes.add(originalParent);
}
}
}

Expand Down Expand Up @@ -306,17 +327,16 @@ public Set<Class<? extends PlanNode>> getReachablePlanNodeTypes()
return reachablePlanNodeTypes;
}

boolean isClosure(ConnectorId connectorId, Session session)
boolean isClosure(ConnectorId connectorId, Session session, List<ConnectorId> supportedConnectorId)
{
// check if all children can reach the only connector
boolean includeValuesNode = isIncludeValuesNodeInConnectorOptimizer(session);
Set<ConnectorId> connectorIds = includeValuesNode ? reachableConnectors.stream().filter(x -> !x.equals(EMPTY_CONNECTOR_ID)).collect(toImmutableSet()) : reachableConnectors;
if (connectorIds.size() != 1 || !connectorIds.contains(connectorId)) {
return false;
if (connectorIds.contains(connectorId) && new HashSet<>(supportedConnectorId).containsAll(connectorIds) && supportedConnectorId.size() == connectorIds.size()) {
// check if all children are accessible by connectors
return containsAll(CONNECTOR_ACCESSIBLE_PLAN_NODES, reachablePlanNodeTypes);
}

// check if all children are accessible by connectors
return containsAll(CONNECTOR_ACCESSIBLE_PLAN_NODES, reachablePlanNodeTypes);
return false;
}
}

Expand Down
Loading
Loading