Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.PlanStatistics;

import java.util.ArrayList;
Expand All @@ -29,26 +30,26 @@ public class HistoricalPlanStatisticsUtil
private HistoricalPlanStatisticsUtil() {}

/**
* Returns predicted plan statistics depending on historical runs
* Returns historical plan statistics entry containing predicted plan statistics depending on historical runs
*/
public static PlanStatistics getPredictedPlanStatistics(
public static Optional<HistoricalPlanStatisticsEntry> getSelectedHistoricalPlanStatisticsEntry(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics,
double historyMatchingThreshold)
{
List<HistoricalPlanStatisticsEntry> lastRunsStatistics = historicalPlanStatistics.getLastRunsStatistics();
if (lastRunsStatistics.isEmpty()) {
return PlanStatistics.empty();
return Optional.empty();
}

Optional<Integer> similarStatsIndex = getSimilarStatsIndex(historicalPlanStatistics, inputTableStatistics, historyMatchingThreshold);

if (similarStatsIndex.isPresent()) {
return lastRunsStatistics.get(similarStatsIndex.get()).getPlanStatistics();
return Optional.of(lastRunsStatistics.get(similarStatsIndex.get()));
}

// TODO: Use linear regression to predict stats if we have only 1 table.
return PlanStatistics.empty();
return Optional.empty();
}

/**
Expand All @@ -58,7 +59,8 @@ public static HistoricalPlanStatistics updatePlanStatistics(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics,
PlanStatistics current,
HistoryBasedOptimizationConfig config)
HistoryBasedOptimizationConfig config,
HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo)
{
List<HistoricalPlanStatisticsEntry> lastRunsStatistics = historicalPlanStatistics.getLastRunsStatistics();

Expand All @@ -69,7 +71,7 @@ public static HistoricalPlanStatistics updatePlanStatistics(
newLastRunsStatistics.remove(similarStatsIndex.get().intValue());
}

newLastRunsStatistics.add(new HistoricalPlanStatisticsEntry(current, inputTableStatistics));
newLastRunsStatistics.add(new HistoricalPlanStatisticsEntry(current, inputTableStatistics, historicalPlanStatisticsEntryInfo));
int maxLastRuns = inputTableStatistics.isEmpty() ? 1 : config.getMaxLastRunsHistory();
if (newLastRunsStatistics.size() > maxLastRuns) {
newLastRunsStatistics.remove(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
import com.facebook.presto.spi.statistics.PlanStatistics;
Expand All @@ -41,7 +42,7 @@
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES;
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getPredictedPlanStatistics;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getSelectedHistoricalPlanStatisticsEntry;
import static com.facebook.presto.cost.HistoryBasedPlanStatisticsManager.historyBasedPlanCanonicalizationStrategyList;
import static com.facebook.presto.sql.planner.iterative.Plans.resolveGroupReferences;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -194,11 +195,14 @@ private PlanNodeStatsEstimate getStatistics(PlanNode planNode, Session session,
if (allHashes.containsKey(strategy) && entry.getKey().getHash().isPresent() && allHashes.get(strategy).equals(entry.getKey())) {
Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(plan, session, true);
if (inputTableStatistics.isPresent()) {
PlanStatistics predictedPlanStatistics = getPredictedPlanStatistics(entry.getValue(), inputTableStatistics.get(), historyMatchingThreshold);
if (predictedPlanStatistics.getConfidence() > 0) {
return delegateStats.combineStats(
predictedPlanStatistics,
new HistoryBasedSourceInfo(entry.getKey().getHash(), inputTableStatistics));
Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry = getSelectedHistoricalPlanStatisticsEntry(entry.getValue(), inputTableStatistics.get(), historyMatchingThreshold);
if (historicalPlanStatisticsEntry.isPresent()) {
PlanStatistics predictedPlanStatistics = historicalPlanStatisticsEntry.get().getPlanStatistics();
if (predictedPlanStatistics.getConfidence() > 0) {
return delegateStats.combineStats(
predictedPlanStatistics,
new HistoryBasedSourceInfo(entry.getKey().getHash(), inputTableStatistics, Optional.of(historicalPlanStatisticsEntry.get().getHistoricalPlanStatisticsEntryInfo())));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the source information to plan statistics returned from HBO.

}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.common.plan.PlanCanonicalizationStrategy;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.spi.statistics.EmptyPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.CachingPlanCanonicalInfoProvider;
import com.facebook.presto.sql.planner.PlanCanonicalInfoProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -39,16 +41,21 @@ public class HistoryBasedPlanStatisticsManager

private HistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider = EmptyPlanStatisticsProvider.getInstance();
private boolean statisticsProviderAdded;
private final boolean isNativeExecution;
private final String serverVersion;

@Inject
public HistoryBasedPlanStatisticsManager(ObjectMapper objectMapper, SessionPropertyManager sessionPropertyManager, Metadata metadata, HistoryBasedOptimizationConfig config)
public HistoryBasedPlanStatisticsManager(ObjectMapper objectMapper, SessionPropertyManager sessionPropertyManager, Metadata metadata, HistoryBasedOptimizationConfig config,
FeaturesConfig featuresConfig, NodeVersion nodeVersion)
{
requireNonNull(objectMapper, "objectMapper is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.historyBasedStatisticsCacheManager = new HistoryBasedStatisticsCacheManager();
ObjectMapper newObjectMapper = objectMapper.copy().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
this.planCanonicalInfoProvider = new CachingPlanCanonicalInfoProvider(historyBasedStatisticsCacheManager, newObjectMapper, metadata);
this.config = requireNonNull(config, "config is null");
this.isNativeExecution = featuresConfig.isNativeExecutionEnabled();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if it's native execution, and record the information when writing stats to HBO

this.serverVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add information for server version

}

public void addHistoryBasedPlanStatisticsProviderFactory(HistoryBasedPlanStatisticsProvider historyBasedPlanStatisticsProvider)
Expand All @@ -67,7 +74,7 @@ public HistoryBasedPlanStatisticsCalculator getHistoryBasedPlanStatisticsCalcula

public HistoryBasedPlanStatisticsTracker getHistoryBasedPlanStatisticsTracker()
{
return new HistoryBasedPlanStatisticsTracker(() -> historyBasedPlanStatisticsProvider, historyBasedStatisticsCacheManager, sessionPropertyManager, config);
return new HistoryBasedPlanStatisticsTracker(() -> historyBasedPlanStatisticsProvider, historyBasedStatisticsCacheManager, sessionPropertyManager, config, isNativeExecution, serverVersion);
}

public PlanCanonicalInfoProvider getPlanCanonicalInfoProvider()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
Expand Down Expand Up @@ -78,17 +79,23 @@ public class HistoryBasedPlanStatisticsTracker
private final HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager;
private final SessionPropertyManager sessionPropertyManager;
private final HistoryBasedOptimizationConfig config;
private final boolean isNativeExecution;
private final String serverVersion;

public HistoryBasedPlanStatisticsTracker(
Supplier<HistoryBasedPlanStatisticsProvider> historyBasedPlanStatisticsProvider,
HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager,
SessionPropertyManager sessionPropertyManager,
HistoryBasedOptimizationConfig config)
HistoryBasedOptimizationConfig config,
boolean isNativeExecution,
String serverVersion)
{
this.historyBasedPlanStatisticsProvider = requireNonNull(historyBasedPlanStatisticsProvider, "historyBasedPlanStatisticsProvider is null");
this.historyBasedStatisticsCacheManager = requireNonNull(historyBasedStatisticsCacheManager, "historyBasedStatisticsCacheManager is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.config = requireNonNull(config, "config is null");
this.isNativeExecution = isNativeExecution;
this.serverVersion = serverVersion;
}

public void updateStatistics(QueryExecution queryExecution)
Expand Down Expand Up @@ -139,6 +146,9 @@ else if (trackStatsForFailedQueries) {
return ImmutableMap.of();
}

HistoricalPlanStatisticsEntryInfo historicalPlanStatisticsEntryInfo = new HistoricalPlanStatisticsEntryInfo(
isNativeExecution ? HistoricalPlanStatisticsEntryInfo.WorkerType.CPP : HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, queryInfo.getQueryId(), serverVersion);

Map<PlanNodeId, PlanNodeStats> planNodeStatsMap = aggregateStageStats(allStages);
Map<PlanNodeWithHash, PlanStatisticsWithSourceInfo> planStatisticsMap = new HashMap<>();
Map<CanonicalPlan, PlanNodeCanonicalInfo> canonicalInfoMap = new HashMap<>();
Expand Down Expand Up @@ -216,7 +226,7 @@ else if (trackStatsForFailedQueries) {
PlanStatisticsWithSourceInfo planStatsWithSourceInfo = new PlanStatisticsWithSourceInfo(
planNode.getId(),
newPlanNodeStats,
new HistoryBasedSourceInfo(Optional.of(hash), Optional.of(inputTableStatistics)));
new HistoryBasedSourceInfo(Optional.of(hash), Optional.of(inputTableStatistics), Optional.of(historicalPlanStatisticsEntryInfo)));
planStatisticsMap.put(planNodeWithHash, planStatsWithSourceInfo);

if (isAggregation(planNode, AggregationNode.Step.FINAL) && ((AggregationNode) planNode).getAggregationId().isPresent() && trackPartialAggregationHistory(session)) {
Expand Down Expand Up @@ -317,7 +327,8 @@ public void updateStatistics(QueryInfo queryInfo)
Map<PlanNodeWithHash, HistoricalPlanStatistics> newPlanStatistics = planStatistics.entrySet().stream()
.filter(entry -> entry.getKey().getHash().isPresent() &&
entry.getValue().getSourceInfo() instanceof HistoryBasedSourceInfo &&
((HistoryBasedSourceInfo) entry.getValue().getSourceInfo()).getInputTableStatistics().isPresent())
((HistoryBasedSourceInfo) entry.getValue().getSourceInfo()).getInputTableStatistics().isPresent() &&
((HistoryBasedSourceInfo) entry.getValue().getSourceInfo()).getHistoricalPlanStatisticsEntryInfo().isPresent())
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> {
Expand All @@ -328,7 +339,8 @@ public void updateStatistics(QueryInfo queryInfo)
historicalPlanStatistics,
historyBasedSourceInfo.getInputTableStatistics().get(),
entry.getValue().getPlanStatistics(),
config);
config,
historyBasedSourceInfo.getHistoricalPlanStatisticsEntryInfo().get());
}));

if (!newPlanStatistics.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
this.statsNormalizer = new StatsNormalizer();
this.scalarStatsCalculator = new ScalarStatsCalculator(metadata);
this.filterStatsCalculator = new FilterStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer);
this.historyBasedPlanStatisticsManager = new HistoryBasedPlanStatisticsManager(objectMapper, new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig());
this.historyBasedPlanStatisticsManager = new HistoryBasedPlanStatisticsManager(objectMapper, new SessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig(), featuresConfig, new NodeVersion("1"));
this.fragmentStatsProvider = new FragmentStatsProvider();
this.statsCalculator = createNewStatsCalculator(metadata, scalarStatsCalculator, statsNormalizer, filterStatsCalculator, historyBasedPlanStatisticsManager, fragmentStatsProvider);
this.taskCountEstimator = new TaskCountEstimator(() -> nodeCountForStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/
package com.facebook.presto.cost;

import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
import com.facebook.presto.spi.statistics.PlanStatistics;
Expand All @@ -23,7 +26,9 @@
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getSelectedHistoricalPlanStatisticsEntry;
import static org.testng.Assert.assertEquals;

public class TestHistoricalPlanStatistics
Expand Down Expand Up @@ -97,16 +102,18 @@ private static HistoricalPlanStatistics updatePlanStatistics(
historicalPlanStatistics,
inputTableStatistics,
current,
new HistoryBasedOptimizationConfig());
new HistoryBasedOptimizationConfig(),
new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0"), "test"));
}

private static PlanStatistics getPredictedPlanStatistics(
HistoricalPlanStatistics historicalPlanStatistics,
List<PlanStatistics> inputTableStatistics)
{
return HistoricalPlanStatisticsUtil.getPredictedPlanStatistics(
Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry = getSelectedHistoricalPlanStatisticsEntry(
historicalPlanStatistics,
inputTableStatistics,
0.1);
return historicalPlanStatisticsEntry.isPresent() ? historicalPlanStatisticsEntry.get().getPlanStatistics() : PlanStatistics.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package com.facebook.presto.cost;

import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntryInfo;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PartialAggregationStatistics;
Expand Down Expand Up @@ -126,7 +128,8 @@ public Map<PlanNodeWithHash, HistoricalPlanStatistics> getStats(List<PlanNodeWit
if (node.getTable().toString().contains("orders")) {
return new HistoricalPlanStatistics(ImmutableList.of(new HistoricalPlanStatisticsEntry(
new PlanStatistics(Estimate.of(100), Estimate.of(1000), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty()),
ImmutableList.of(new PlanStatistics(Estimate.of(15000), Estimate.unknown(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty())))));
ImmutableList.of(new PlanStatistics(Estimate.of(15000), Estimate.unknown(), 1, JoinNodeStatistics.empty(), TableWriterNodeStatistics.empty(), PartialAggregationStatistics.empty())),
new HistoricalPlanStatisticsEntryInfo(HistoricalPlanStatisticsEntryInfo.WorkerType.JAVA, QueryId.valueOf("0"), "test"))));
}
}
return HistoricalPlanStatistics.empty();
Expand Down
Loading