Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.trino.Session;
import io.trino.connector.informationschema.InformationSchemaTableHandle;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.connector.system.SystemTableHandle;
import io.trino.exchange.SpoolingExchangeInput;
import io.trino.execution.BasicStageStats;
import io.trino.execution.ExecutionFailureInfo;
Expand Down Expand Up @@ -94,9 +91,7 @@
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand Down Expand Up @@ -1194,19 +1189,23 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta

boolean coordinatorStage = stage.getFragment().getPartitioning().equals(COORDINATOR_DISTRIBUTION);

boolean noMemoryFragment = isNoMemoryFragment(fragment);
if (eager) {
sourceExchanges.values().forEach(sourceExchange -> sourceExchange.setSourceHandlesDeliveryMode(EAGER));
}

Function<PlanFragmentId, PlanFragment> planFragmentLookup = planFragmentId -> {
StageExecution stageExecution = stageExecutions.get(getStageId(planFragmentId));
checkArgument(stageExecution != null, "stage for fragment %s not started yet", planFragmentId);
return stageExecution.getStageInfo().getPlan();
};
StageExecution execution = new StageExecution(
queryStateMachine,
taskDescriptorStorage,
stage,
taskSource,
sinkPartitioningScheme,
exchange,
noMemoryFragment,
noMemoryFragment ? new NoMemoryPartitionMemoryEstimator() : memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment),
memoryEstimatorFactory.createPartitionMemoryEstimator(session, fragment, planFragmentLookup),
// do not retry coordinator only tasks
coordinatorStage ? 1 : maxTaskExecutionAttempts,
schedulingPriority,
Expand Down Expand Up @@ -1240,36 +1239,6 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
}
}

private boolean isNoMemoryFragment(PlanFragment fragment)
{
if (fragment.getRoot().getSources().stream()
.anyMatch(planNode -> planNode instanceof RefreshMaterializedViewNode)) {
// REFRESH MATERIALIZED VIEW will issue other SQL commands under the hood. If its task memory is
// non-zero, then a deadlock scenario is possible if we only have a single node in the cluster.
return true;
}

// If source fragments are not tagged as "no-memory" assume that they may produce significant amount of data.
// We stay on the safe side an assume that we should use standard memory estimation for this fragment
if (!fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream())
.allMatch(sourceFragmentId -> stageExecutions.get(getStageId(sourceFragmentId)).isNoMemoryFragment())) {
return false;
}

// If fragment source is not reading any external tables or only accesses information_schema assume it does not need significant amount of memory.
// Allow scheduling even if whole server memory is pre allocated.
List<PlanNode> tableScanNodes = PlanNodeSearcher.searchFrom(fragment.getRoot()).whereIsInstanceOfAny(TableScanNode.class).findAll();
return tableScanNodes.stream().allMatch(node -> isMetadataTableScan((TableScanNode) node));
}

private static boolean isMetadataTableScan(TableScanNode tableScanNode)
{
return (tableScanNode.getTable().getConnectorHandle() instanceof InformationSchemaTableHandle) ||
(tableScanNode.getTable().getCatalogHandle().getCatalogName().equals(GlobalSystemConnector.NAME) &&
(tableScanNode.getTable().getConnectorHandle() instanceof SystemTableHandle systemHandle) &&
systemHandle.getSchemaName().equals("jdbc"));
}

private StageId getStageId(PlanFragmentId fragmentId)
{
return StageId.create(queryStateMachine.getQueryId(), fragmentId);
Expand Down Expand Up @@ -1628,7 +1597,6 @@ private static class StageExecution
private final EventDrivenTaskSource taskSource;
private final FaultTolerantPartitioningScheme sinkPartitioningScheme;
private final Exchange exchange;
private final boolean noMemoryFragment;
private final PartitionMemoryEstimator partitionMemoryEstimator;
private final int maxTaskExecutionAttempts;
private final int schedulingPriority;
Expand Down Expand Up @@ -1668,7 +1636,6 @@ private StageExecution(
EventDrivenTaskSource taskSource,
FaultTolerantPartitioningScheme sinkPartitioningScheme,
Exchange exchange,
boolean noMemoryFragment,
PartitionMemoryEstimator partitionMemoryEstimator,
int maxTaskExecutionAttempts,
int schedulingPriority,
Expand All @@ -1687,7 +1654,6 @@ private StageExecution(
this.taskSource = requireNonNull(taskSource, "taskSource is null");
this.sinkPartitioningScheme = requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null");
this.exchange = requireNonNull(exchange, "exchange is null");
this.noMemoryFragment = noMemoryFragment;
this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
this.maxTaskExecutionAttempts = maxTaskExecutionAttempts;
this.schedulingPriority = schedulingPriority;
Expand Down Expand Up @@ -1765,11 +1731,6 @@ public boolean isExchangeClosed()
return exchangeClosed;
}

public boolean isNoMemoryFragment()
{
return noMemoryFragment;
}

public void addPartition(int partitionId, NodeRequirements nodeRequirements)
{
if (getState().isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.ErrorCode;
import io.trino.spi.memory.MemoryPoolInfo;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.assertj.core.util.VisibleForTesting;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -117,7 +119,10 @@ void refreshNodePoolMemoryInfos()
}

@Override
public PartitionMemoryEstimator createPartitionMemoryEstimator(Session session, PlanFragment planFragment)
public PartitionMemoryEstimator createPartitionMemoryEstimator(
Session session,
PlanFragment planFragment,
Function<PlanFragmentId, PlanFragment> sourceFragmentLookup)
Comment thread
losipiuk marked this conversation as resolved.
Outdated
{
DataSize defaultInitialMemoryLimit = planFragment.getPartitioning().equals(COORDINATOR_DISTRIBUTION) ?
getFaultTolerantExecutionDefaultCoordinatorTaskMemory(session) :
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.scheduler;

import com.google.inject.BindingAnnotation;
import com.google.inject.Inject;
import io.trino.Session;
import io.trino.connector.informationschema.InformationSchemaTableHandle;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.connector.system.SystemTableHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.RefreshMaterializedViewNode;
import io.trino.sql.planner.plan.TableScanNode;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.List;
import java.util.function.Function;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;

public class NoMemoryAwarePartitionMemoryEstimator
{
@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForNoMemoryAwarePartitionMemoryEstimator {}

public static class Factory
implements PartitionMemoryEstimatorFactory
{
private final PartitionMemoryEstimatorFactory delegateFactory;

@Inject
public Factory(@ForNoMemoryAwarePartitionMemoryEstimator PartitionMemoryEstimatorFactory delegateFactory)
{
this.delegateFactory = requireNonNull(delegateFactory, "delegateFactory is null");
}

@Override
public PartitionMemoryEstimator createPartitionMemoryEstimator(
Session session,
PlanFragment planFragment,
Function<PlanFragmentId, PlanFragment> sourceFragmentLookup)
{
if (isNoMemoryFragment(planFragment, sourceFragmentLookup)) {
return NoMemoryPartitionMemoryEstimator.INSTANCE;
}
return delegateFactory.createPartitionMemoryEstimator(session, planFragment, sourceFragmentLookup);
}

private boolean isNoMemoryFragment(PlanFragment fragment, Function<PlanFragmentId, PlanFragment> childFragmentLookup)
{
if (fragment.getRoot().getSources().stream()
.anyMatch(planNode -> planNode instanceof RefreshMaterializedViewNode)) {
// REFRESH MATERIALIZED VIEW will issue other SQL commands under the hood. If its task memory is
// non-zero, then a deadlock scenario is possible if we only have a single node in the cluster.
return true;
}

// If source fragments are not tagged as "no-memory" assume that they may produce significant amount of data.
// We stay on the safe side an assume that we should use standard memory estimation for this fragment
if (!fragment.getRemoteSourceNodes().stream().flatMap(node -> node.getSourceFragmentIds().stream())
// TODO: childFragmentLookup will be executed for subtree of every fragment in query plan. That means fragment will be
// analyzed multiple time. Given fact that logic here is not extremely expensive and plans are not gigantic (up to ~200 fragments)
// we can keep it as a first approach. Ultimately we should profile execution and possibly put in place some mechanisms to avoid repeated work.
.allMatch(sourceFragmentId -> isNoMemoryFragment(childFragmentLookup.apply(sourceFragmentId), childFragmentLookup))) {
return false;
}

// If fragment source is not reading any external tables or only accesses information_schema assume it does not need significant amount of memory.
// Allow scheduling even if whole server memory is pre allocated.
List<PlanNode> tableScanNodes = PlanNodeSearcher.searchFrom(fragment.getRoot()).whereIsInstanceOfAny(TableScanNode.class).findAll();
return tableScanNodes.stream().allMatch(node -> isMetadataTableScan((TableScanNode) node));
}

private static boolean isMetadataTableScan(TableScanNode tableScanNode)
{
return (tableScanNode.getTable().getConnectorHandle() instanceof InformationSchemaTableHandle) ||
(tableScanNode.getTable().getCatalogHandle().getCatalogName().equals(GlobalSystemConnector.NAME) &&
(tableScanNode.getTable().getConnectorHandle() instanceof SystemTableHandle systemHandle) &&
systemHandle.getSchemaName().equals("jdbc"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
public class NoMemoryPartitionMemoryEstimator
implements PartitionMemoryEstimator
{
public static final NoMemoryPartitionMemoryEstimator INSTANCE = new NoMemoryPartitionMemoryEstimator();

private NoMemoryPartitionMemoryEstimator() {}

@Override
public MemoryRequirements getInitialMemoryRequirements()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@

import io.trino.Session;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.PlanFragmentId;

import java.util.function.Function;

@FunctionalInterface
public interface PartitionMemoryEstimatorFactory
{
PartitionMemoryEstimator createPartitionMemoryEstimator(Session session, PlanFragment planFragment);
PartitionMemoryEstimator createPartitionMemoryEstimator(
Session session,
PlanFragment planFragment,
Function<PlanFragmentId, PlanFragment> sourceFragmentLookup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import io.trino.execution.scheduler.BinPackingNodeAllocatorService;
import io.trino.execution.scheduler.EventDrivenTaskSourceFactory;
import io.trino.execution.scheduler.ExponentialGrowthPartitionMemoryEstimator;
import io.trino.execution.scheduler.NoMemoryAwarePartitionMemoryEstimator;
import io.trino.execution.scheduler.NoMemoryAwarePartitionMemoryEstimator.ForNoMemoryAwarePartitionMemoryEstimator;
import io.trino.execution.scheduler.NodeAllocatorService;
import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory;
import io.trino.execution.scheduler.SplitSchedulerStats;
Expand Down Expand Up @@ -221,7 +223,10 @@ protected void setup(Binder binder)
// node allocator
binder.bind(BinPackingNodeAllocatorService.class).in(Scopes.SINGLETON);
binder.bind(NodeAllocatorService.class).to(BinPackingNodeAllocatorService.class);
binder.bind(PartitionMemoryEstimatorFactory.class).to(ExponentialGrowthPartitionMemoryEstimator.Factory.class);
binder.bind(PartitionMemoryEstimatorFactory.class).to(NoMemoryAwarePartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON);
binder.bind(PartitionMemoryEstimatorFactory.class)
.annotatedWith(ForNoMemoryAwarePartitionMemoryEstimator.class)
.to(ExponentialGrowthPartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON);

// node monitor
binder.bind(ClusterSizeMonitor.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.net.URI;
import java.util.Optional;
import java.util.function.Function;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand All @@ -50,6 +51,10 @@

public class TestExponentialGrowthPartitionMemoryEstimator
{
private static final Function<PlanFragmentId, PlanFragment> THROWING_PLAN_FRAGMENT_LOOKUP = planFragmentId -> {
throw new RuntimeException("should not be used");
};

@Test
public void testDefaultInitialEstimation()
{
Expand All @@ -63,10 +68,10 @@ public void testDefaultInitialEstimation()
.setSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, "113MB")
.build();

assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(COORDINATOR_DISTRIBUTION)).getInitialMemoryRequirements())
assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(COORDINATOR_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP).getInitialMemoryRequirements())
.isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE)));

assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION)).getInitialMemoryRequirements())
assertThat(estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP).getInitialMemoryRequirements())
.isEqualTo(new MemoryRequirements(DataSize.of(113, MEGABYTE)));
}

Expand All @@ -82,7 +87,7 @@ public void testEstimator()
.setSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, "107MB")
.build();

PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION));
PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP);

assertThat(estimator.getInitialMemoryRequirements())
.isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE)));
Expand Down Expand Up @@ -207,7 +212,7 @@ private static void testInitialEstimationWithFinishedPartitions(
.setSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, defaultInitialTaskMemory.toString())
.build();

PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION));
PartitionMemoryEstimator estimator = estimatorFactory.createPartitionMemoryEstimator(session, getPlanFragment(SINGLE_DISTRIBUTION), THROWING_PLAN_FRAGMENT_LOOKUP);

for (int i = 0; i < recordedPartitionsCount; i++) {
estimator.registerPartitionFinished(new MemoryRequirements(recordedMemoryUsage), recordedMemoryUsage, true, Optional.empty());
Expand Down
Loading