Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.github.ishugaliy</groupId>
<artifactId>allgood-consistent-hash</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
Expand Down Expand Up @@ -269,6 +275,11 @@
<artifactId>trino-matching</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-cache</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.inject.Inject;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.cache.CacheConfig;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.TaskManagerConfig;
Expand Down Expand Up @@ -215,6 +216,12 @@ public final class SystemSessionProperties
public static final String USE_COST_BASED_PARTITIONING = "use_cost_based_partitioning";
public static final String PUSH_FILTER_INTO_VALUES_MAX_ROW_COUNT = "push_filter_into_values_max_row_count";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String CACHE_ENABLED = "cache_enabled";
public static final String CACHE_COMMON_SUBQUERIES_ENABLED = "cache_common_subqueries_enabled";
public static final String CACHE_AGGREGATIONS_ENABLED = "cache_aggregations_enabled";
public static final String CACHE_PROJECTIONS_ENABLED = "cache_projections_enabled";
public static final String CACHE_MAX_SPLIT_SIZE = "cache_max_split_size";
public static final String CACHE_MIN_WORKER_SPLIT_SEPARATION = "cache_min_worker_split_separation";
public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size";
public static final String IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD = "idle_writer_min_data_size_threshold";
public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration";
Expand All @@ -232,6 +239,7 @@ public SystemSessionProperties()
new OptimizerConfig(),
new NodeMemoryConfig(),
new DynamicFilterConfig(),
new CacheConfig(),
new NodeSchedulerConfig());
}

Expand All @@ -244,6 +252,7 @@ public SystemSessionProperties(
OptimizerConfig optimizerConfig,
NodeMemoryConfig nodeMemoryConfig,
DynamicFilterConfig dynamicFilterConfig,
CacheConfig cacheConfig,
NodeSchedulerConfig nodeSchedulerConfig)
{
sessionProperties = ImmutableList.of(
Expand Down Expand Up @@ -1112,6 +1121,41 @@ public SystemSessionProperties(
"Enables columnar evaluation of filters",
featuresConfig.isColumnarFilterEvaluationEnabled(),
false),
booleanProperty(
CACHE_ENABLED,
"Enables subquery caching",
cacheConfig.isEnabled(),
enabled -> {
if (enabled && !cacheConfig.isEnabled()) {
throw new TrinoException(INVALID_SESSION_PROPERTY, "Subquery cache must be enabled via feature config");
}
},
true),
booleanProperty(
CACHE_COMMON_SUBQUERIES_ENABLED,
"Enables caching of common subqueries when running a single query",
cacheConfig.isEnabled() && cacheConfig.isCacheCommonSubqueriesEnabled(),
true),
booleanProperty(
CACHE_AGGREGATIONS_ENABLED,
"Enables caching of aggregations",
cacheConfig.isEnabled() && cacheConfig.isCacheAggregationsEnabled(),
true),
booleanProperty(
CACHE_PROJECTIONS_ENABLED,
"Enables caching of projections",
cacheConfig.isEnabled() && cacheConfig.isCacheProjectionsEnabled(),
true),
dataSizeProperty(
CACHE_MAX_SPLIT_SIZE,
"Max size of cached split",
cacheConfig.getMaxSplitSize(),
true),
integerProperty(
CACHE_MIN_WORKER_SPLIT_SEPARATION,
"The minimum separation (in terms of processed splits) between two splits with same cache split id being scheduled on the single worker",
cacheConfig.getCacheMinWorkerSplitSeparation(),
true),
integerProperty(PAGE_PARTITIONING_BUFFER_POOL_SIZE,
"Maximum number of free buffers in the per task partitioned page buffer pool. Setting this to zero effectively disables the pool",
taskManagerConfig.getPagePartitioningBufferPoolSize(),
Expand Down Expand Up @@ -1998,6 +2042,36 @@ public static boolean isForceSpillingOperator(Session session)
return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class);
}

public static boolean isCacheEnabled(Session session)
{
return session.getSystemProperty(CACHE_ENABLED, Boolean.class);
}

public static boolean isCacheCommonSubqueriesEnabled(Session session)
{
return session.getSystemProperty(CACHE_COMMON_SUBQUERIES_ENABLED, Boolean.class);
}

public static boolean isCacheAggregationsEnabled(Session session)
{
return session.getSystemProperty(CACHE_AGGREGATIONS_ENABLED, Boolean.class);
}

public static boolean isCacheProjectionsEnabled(Session session)
{
return session.getSystemProperty(CACHE_PROJECTIONS_ENABLED, Boolean.class);
}

public static DataSize getCacheMaxSplitSize(Session session)
{
return session.getSystemProperty(CACHE_MAX_SPLIT_SIZE, DataSize.class);
}

public static int getCacheMinWorkerSplitSeparation(Session session)
{
return session.getSystemProperty(CACHE_MIN_WORKER_SPLIT_SEPARATION, Integer.class);
}

public static int getPagePartitioningBufferPoolSize(Session session)
{
return session.getSystemProperty(PAGE_PARTITIONING_BUFFER_POOL_SIZE, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.cache;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.spi.cache.CacheManager;
import io.trino.sql.PlannerContext;
import io.trino.sql.planner.PlanNodeIdAllocator;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.iterative.Lookup;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.CacheDataPlanNode;
import io.trino.sql.planner.plan.ChooseAlternativeNode;
import io.trino.sql.planner.plan.LoadCachedDataPlanNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.SimplePlanRewriter;

import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.SystemSessionProperties.isCacheEnabled;
import static io.trino.cache.CommonSubqueriesExtractor.extractCommonSubqueries;
import static io.trino.sql.planner.iterative.Lookup.noLookup;
import static java.util.Objects.requireNonNull;

/**
* Extracts common subqueries and substitutes each subquery with {@link ChooseAlternativeNode}
* consisting of 3 alternatives:
* * original subplan
* * subplan that caches data with {@link CacheManager}
* * subplan that reads data from {@link CacheManager}
*/
public class CacheCommonSubqueries
{
public static final int ORIGINAL_PLAN_ALTERNATIVE = 0;
public static final int STORE_PAGES_ALTERNATIVE = 1;
public static final int LOAD_PAGES_ALTERNATIVE = 2;

private final boolean cacheEnabled;
private final CacheController cacheController;
private final PlannerContext plannerContext;
private final Session session;
private final PlanNodeIdAllocator idAllocator;
private final SymbolAllocator symbolAllocator;

public CacheCommonSubqueries(
CacheController cacheController,
PlannerContext plannerContext,
Session session,
PlanNodeIdAllocator idAllocator,
SymbolAllocator symbolAllocator)
{
this.cacheController = requireNonNull(cacheController, "cacheController is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
this.session = requireNonNull(session, "session is null");
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null");
this.cacheEnabled = isCacheEnabled(session);
}

public PlanNode cacheSubqueries(PlanNode node)
{
if (!cacheEnabled) {
return node;
}

Map<PlanNode, CommonPlanAdaptation> adaptations = extractCommonSubqueries(
cacheController,
plannerContext,
session,
idAllocator,
symbolAllocator,
node);

// add alternatives for each adaptation
ImmutableMap.Builder<PlanNode, PlanNode> nodeMapping = ImmutableMap.builder();
for (Map.Entry<PlanNode, CommonPlanAdaptation> entry : adaptations.entrySet()) {
CommonPlanAdaptation adaptation = entry.getValue();

PlanNode storePagesAlternative =
adaptation.adaptCommonSubplan(
new CacheDataPlanNode(
idAllocator.getNextId(),
adaptation.getCommonSubplan()),
idAllocator);

PlanNode loadPagesAlternative =
adaptation.adaptCommonSubplan(
new LoadCachedDataPlanNode(
idAllocator.getNextId(),
adaptation.getCommonSubplanSignature(),
adaptation.getCommonDynamicFilterDisjuncts(),
adaptation.getCommonColumnHandles(),
adaptation.getCommonSubplan().getOutputSymbols()),
idAllocator);

PlanNode[] alternatives = new PlanNode[3];
// use static indexes explicitly to make ensure code stays consistent with static indexes
alternatives[ORIGINAL_PLAN_ALTERNATIVE] = entry.getKey();
alternatives[STORE_PAGES_ALTERNATIVE] = storePagesAlternative;
alternatives[LOAD_PAGES_ALTERNATIVE] = loadPagesAlternative;

nodeMapping.put(entry.getKey(), new ChooseAlternativeNode(
idAllocator.getNextId(),
ImmutableList.copyOf(alternatives),
adaptation.getCommonSubplanFilteredTableScan()));
}

return SimplePlanRewriter.rewriteWith(new PlanReplacer(nodeMapping.buildOrThrow()), node);
}

public static boolean isCacheChooseAlternativeNode(PlanNode node)
{
return isCacheChooseAlternativeNode(node, noLookup());
}

public static boolean isCacheChooseAlternativeNode(PlanNode node, Lookup lookup)
{
if (!(node instanceof ChooseAlternativeNode chooseAlternativeNode)) {
return false;
}

if (chooseAlternativeNode.getSources().size() != 3) {
return false;
}

return PlanNodeSearcher.searchFrom(chooseAlternativeNode.getSources().get(LOAD_PAGES_ALTERNATIVE), lookup)
.whereIsInstanceOfAny(LoadCachedDataPlanNode.class)
.matches();
}

public static LoadCachedDataPlanNode getLoadCachedDataPlanNode(ChooseAlternativeNode node)
{
checkArgument(isCacheChooseAlternativeNode(node), "ChooseAlternativeNode should contain cache alternatives");
return (LoadCachedDataPlanNode) PlanNodeSearcher.searchFrom(node.getSources().get(LOAD_PAGES_ALTERNATIVE))
.whereIsInstanceOfAny(LoadCachedDataPlanNode.class)
.findOnlyElement();
}

private static class PlanReplacer
extends SimplePlanRewriter<Void>
{
private final Map<PlanNode, PlanNode> mapping;

public PlanReplacer(Map<PlanNode, PlanNode> mapping)
{
this.mapping = requireNonNull(mapping, "mapping is null");
}

@Override
protected PlanNode visitPlan(PlanNode node, RewriteContext<Void> context)
{
if (mapping.containsKey(node)) {
return mapping.get(node);
}

return context.defaultRewrite(node, context.get());
}
}
}
Loading