Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;
import java.util.WeakHashMap;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

Expand All @@ -29,24 +30,36 @@ public class CachingTableStatsProvider
{
private final Metadata metadata;
private final Session session;
private final Supplier<Boolean> isQueryDone;

private final Map<TableHandle, TableStatistics> cache = new WeakHashMap<>();

public CachingTableStatsProvider(Metadata metadata, Session session)
public CachingTableStatsProvider(Metadata metadata, Session session, Supplier<Boolean> isQueryDone)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
this.isQueryDone = requireNonNull(isQueryDone, "isQueryDone is null");
}

@Override
public TableStatistics getTableStatistics(TableHandle tableHandle)
{
TableStatistics stats = cache.get(tableHandle);
if (stats == null) {
stats = metadata.getTableStatistics(session, tableHandle);
cache.put(tableHandle, stats);
return cache.computeIfAbsent(tableHandle, this::getTableStatisticsInternal);
}

private TableStatistics getTableStatisticsInternal(TableHandle tableHandle)
{
try {
return metadata.getTableStatistics(session, tableHandle);
}
catch (RuntimeException e) {
if (isQueryDone.get()) {
// getting statistics for finished query may result in many different exceptions being thrown.
// As we do not care about the result anyway mask it by returning empty statistics.
return TableStatistics.empty();
}
throw e;
}
return stats;
}

public Map<TableHandle, TableStatistics> getCachedTableStatistics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public void start()
}, directExecutor());

try {
CachingTableStatsProvider tableStatsProvider = new CachingTableStatsProvider(plannerContext.getMetadata(), getSession());
CachingTableStatsProvider tableStatsProvider = new CachingTableStatsProvider(plannerContext.getMetadata(), getSession(), stateMachine::isDone);
PlanRoot plan = planQuery(tableStatsProvider);
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.execution.QueryManager;
import io.trino.metadata.LanguageFunctionManager.RunAsIdentityLoader;
import io.trino.security.AccessControl;
import io.trino.security.InjectedConnectorAccessControl;
Expand Down Expand Up @@ -129,7 +128,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -195,7 +193,6 @@ public final class MetadataManager
private final TableFunctionRegistry tableFunctionRegistry;
private final TypeManager typeManager;
private final TypeCoercion typeCoercion;
private final QueryManager queryManager;

private final ConcurrentMap<QueryId, QueryCatalogs> catalogsByQueryId = new ConcurrentHashMap<>();

Expand All @@ -207,15 +204,13 @@ public MetadataManager(
GlobalFunctionCatalog globalFunctionCatalog,
LanguageFunctionManager languageFunctionManager,
TableFunctionRegistry tableFunctionRegistry,
TypeManager typeManager,
QueryManager queryManager)
TypeManager typeManager)
{
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
functions = requireNonNull(globalFunctionCatalog, "globalFunctionCatalog is null");
functionResolver = new BuiltinFunctionResolver(this, typeManager, globalFunctionCatalog);
this.typeCoercion = new TypeCoercion(typeManager::getType);
this.queryManager = requireNonNull(queryManager, "queryManager is null");

this.systemSecurityMetadata = requireNonNull(systemSecurityMetadata, "systemSecurityMetadata is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand Down Expand Up @@ -489,33 +484,11 @@ public TableMetadata getTableMetadata(Session session, TableHandle tableHandle)
@Override
public TableStatistics getTableStatistics(Session session, TableHandle tableHandle)
{
try {
CatalogHandle catalogHandle = tableHandle.catalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
TableStatistics tableStatistics = metadata.getTableStatistics(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle());
verifyNotNull(tableStatistics, "%s returned null tableStatistics for %s", metadata, tableHandle);
return tableStatistics;
}
catch (RuntimeException e) {
if (isQueryDone(session)) {
// getting statistics for finished query may result in many different execeptions being thrown.
// As we do not care about the result anyway mask it by returning empty statistics.
return TableStatistics.empty();
}
throw e;
}
}

private boolean isQueryDone(Session session)
{
boolean done;
try {
done = queryManager.getQueryState(session.getQueryId()).isDone();
}
catch (NoSuchElementException ex) {
done = true;
}
return done;
CatalogHandle catalogHandle = tableHandle.catalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
TableStatistics tableStatistics = metadata.getTableStatistics(session.toConnectorSession(catalogHandle), tableHandle.connectorHandle());
verifyNotNull(tableStatistics, "%s returned null tableStatistics for %s", metadata, tableHandle);
return tableStatistics;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public Plan getLogicalPlan(Session session, Statement statement, List<Expression
costCalculator,
warningCollector,
planOptimizersStatsCollector,
new CachingTableStatsProvider(plannerContext.getMetadata(), session));
new CachingTableStatsProvider(plannerContext.getMetadata(), session, () -> false));
return logicalPlanner.plan(analysis, OPTIMIZED_AND_VALIDATED, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected Node visitShowStats(ShowStats node, Void context)
{
Query query = getRelation(node);
Plan plan = queryExplainer.getLogicalPlan(session, query, parameters, warningCollector, planOptimizersStatsCollector);
CachingStatsProvider cachingStatsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(metadata, session));
CachingStatsProvider cachingStatsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(metadata, session, () -> false));
PlanNodeStatsEstimate stats = cachingStatsProvider.getStats(plan.getRoot());
return rewriteShowStats(plan, stats);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
globalFunctionCatalog,
languageFunctionManager,
tableFunctionRegistry,
typeManager,
new NotImplementedQueryManager());
typeManager);
typeRegistry.addType(new JsonPath2016Type(new TypeDeserializer(typeManager), blockEncodingSerde));
this.joinCompiler = new JoinCompiler(typeOperators);
this.hashStrategyCompiler = new FlatHashStrategyCompiler(typeOperators);
Expand Down Expand Up @@ -914,7 +913,7 @@ public Plan createPlan(Session session, @Language("SQL") String sql, List<PlanOp
costCalculator,
warningCollector,
planOptimizersStatsCollector,
new CachingTableStatsProvider(getPlannerContext().getMetadata(), session));
new CachingTableStatsProvider(getPlannerContext().getMetadata(), session, () -> false));

Analysis analysis = analyzer.analyze(preparedQuery.getStatement());
// make PlanTester always compute plan statistics for test purposes
Expand All @@ -931,7 +930,7 @@ public SubPlan createAdaptivePlan(Session session, SubPlan subPlan, List<Adaptiv
new PlanSanityChecker(false),
warningCollector,
planOptimizersStatsCollector,
new CachingTableStatsProvider(getPlannerContext().getMetadata(), session));
new CachingTableStatsProvider(getPlannerContext().getMetadata(), session, () -> false));
return adaptivePlanner.optimize(subPlan, runtimeInfoProvider);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public StatsCalculatorAssertion check(Consumer<PlanNodeStatsAssertion> statistic
this::getSourceStats,
noLookup(),
session,
tableStatsProvider.orElseGet(() -> new CachingTableStatsProvider(queryRunner.getPlannerContext().getMetadata(), session)),
tableStatsProvider.orElseGet(() -> new CachingTableStatsProvider(queryRunner.getPlannerContext().getMetadata(), session, () -> false)),
runtimeInfoProvider));
statisticsAssertionConsumer.accept(PlanNodeStatsAssertion.assertThat(statsEstimate));
return this;
Expand All @@ -113,7 +113,7 @@ public StatsCalculatorAssertion check(Rule<?> rule, Consumer<PlanNodeStatsAssert
this::getSourceStats,
noLookup(),
session,
tableStatsProvider.orElseGet(() -> new CachingTableStatsProvider(queryRunner.getPlannerContext().getMetadata(), session)),
tableStatsProvider.orElseGet(() -> new CachingTableStatsProvider(queryRunner.getPlannerContext().getMetadata(), session, () -> false)),
runtimeInfoProvider));
checkState(statsEstimate.isPresent(), "Expected stats estimates to be present");
statisticsAssertionConsumer.accept(PlanNodeStatsAssertion.assertThat(statsEstimate.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ private CostAssertionBuilder assertCostFragmentedPlan(
Map<String, PlanCostEstimate> costs,
Map<String, PlanNodeStatsEstimate> stats)
{
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator(stats), session, new CachingTableStatsProvider(planTester.getPlannerContext().getMetadata(), session));
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator(stats), session, new CachingTableStatsProvider(planTester.getPlannerContext().getMetadata(), session, () -> false));
CostProvider costProvider = new TestingCostProvider(costs, costCalculatorUsingExchanges, statsProvider, session);
SubPlan subPlan = fragment(new Plan(node, StatsAndCosts.create(node, statsProvider, costProvider)));
return new CostAssertionBuilder(subPlan.getFragment().getStatsAndCosts().getCosts().getOrDefault(node.getId(), PlanCostEstimate.unknown()));
Expand Down Expand Up @@ -658,14 +658,14 @@ private PlanCostEstimate calculateCost(

private PlanCostEstimate calculateCost(PlanNode node, CostCalculator costCalculator, StatsCalculator statsCalculator)
{
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(planTester.getPlannerContext().getMetadata(), session));
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(planTester.getPlannerContext().getMetadata(), session, () -> false));
CostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, Optional.empty(), session);
return costProvider.getCost(node);
}

private PlanCostEstimate calculateCostFragmentedPlan(PlanNode node, StatsCalculator statsCalculator)
{
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(planTester.getPlannerContext().getMetadata(), session));
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(planTester.getPlannerContext().getMetadata(), session, () -> false));
CostProvider costProvider = new CachingCostProvider(costCalculatorUsingExchanges, statsProvider, Optional.empty(), session);
SubPlan subPlan = fragment(new Plan(node, StatsAndCosts.create(node, statsProvider, costProvider)));
return subPlan.getFragment().getStatsAndCosts().getCosts().getOrDefault(node.getId(), PlanCostEstimate.unknown());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.parser.SqlParser;
import io.trino.testing.NotImplementedQueryManager;
import io.trino.transaction.TransactionManager;
import io.trino.type.BlockTypeOperators;

Expand Down Expand Up @@ -112,8 +111,7 @@ public MetadataManager build()
globalFunctionCatalog,
languageFunctionManager,
tableFunctionRegistry,
typeManager,
new NotImplementedQueryManager());
typeManager);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void assertPlan(Session session, Metadata metadata, FunctionManage

public static void assertPlan(Session session, Metadata metadata, FunctionManager functionManager, StatsCalculator statsCalculator, Plan actual, Lookup lookup, PlanMatchPattern pattern)
{
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(metadata, session));
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, new CachingTableStatsProvider(metadata, session, () -> false));
assertPlan(session, metadata, functionManager, statsProvider, actual, lookup, pattern);
}

Expand Down
Loading