predicate()
+ {
+ return predicate;
+ }
+ }
+}
diff --git a/core/trino-main/src/main/java/io/trino/cache/CommonSubqueriesExtractor.java b/core/trino-main/src/main/java/io/trino/cache/CommonSubqueriesExtractor.java
new file mode 100644
index 000000000000..92ee3f566f9d
--- /dev/null
+++ b/core/trino-main/src/main/java/io/trino/cache/CommonSubqueriesExtractor.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.cache;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Streams;
+import io.trino.Session;
+import io.trino.cache.CacheController.CacheCandidate;
+import io.trino.cache.CanonicalSubplan.AggregationKey;
+import io.trino.cache.CanonicalSubplan.FilterProjectKey;
+import io.trino.cache.CanonicalSubplan.Key;
+import io.trino.cache.CanonicalSubplan.ScanFilterProjectKey;
+import io.trino.cache.CanonicalSubplan.TableScan;
+import io.trino.cache.CanonicalSubplan.TopNKey;
+import io.trino.cache.CanonicalSubplan.TopNRankingKey;
+import io.trino.cache.CommonPlanAdaptation.PlanSignatureWithPredicate;
+import io.trino.cost.PlanNodeStatsEstimate;
+import io.trino.metadata.ResolvedFunction;
+import io.trino.spi.cache.CacheColumnId;
+import io.trino.spi.cache.CacheManager;
+import io.trino.spi.cache.CacheTableId;
+import io.trino.spi.cache.PlanSignature;
+import io.trino.spi.cache.SignatureKey;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.SortOrder;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.Type;
+import io.trino.sql.PlannerContext;
+import io.trino.sql.ir.Expression;
+import io.trino.sql.ir.IrUtils;
+import io.trino.sql.ir.Reference;
+import io.trino.sql.planner.DomainTranslator;
+import io.trino.sql.planner.DomainTranslator.ExtractionResult;
+import io.trino.sql.planner.PlanNodeIdAllocator;
+import io.trino.sql.planner.Symbol;
+import io.trino.sql.planner.SymbolAllocator;
+import io.trino.sql.planner.SymbolsExtractor;
+import io.trino.sql.planner.optimizations.SymbolMapper;
+import io.trino.sql.planner.plan.AggregationNode;
+import io.trino.sql.planner.plan.AggregationNode.Aggregation;
+import io.trino.sql.planner.plan.Assignments;
+import io.trino.sql.planner.plan.ChooseAlternativeNode.FilteredTableScan;
+import io.trino.sql.planner.plan.DataOrganizationSpecification;
+import io.trino.sql.planner.plan.FilterNode;
+import io.trino.sql.planner.plan.PlanNode;
+import io.trino.sql.planner.plan.PlanNodeId;
+import io.trino.sql.planner.plan.ProjectNode;
+import io.trino.sql.planner.plan.TableScanNode;
+import io.trino.sql.planner.plan.TopNNode;
+import io.trino.sql.planner.plan.TopNRankingNode;
+import io.trino.sql.planner.plan.ValuesNode;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.Streams.forEachPair;
+import static com.google.common.collect.Streams.zip;
+import static io.trino.cache.CanonicalSubplanExtractor.canonicalExpressionToColumnId;
+import static io.trino.cache.CanonicalSubplanExtractor.canonicalSymbolToColumnId;
+import static io.trino.cache.CanonicalSubplanExtractor.columnIdToSymbol;
+import static io.trino.cache.CanonicalSubplanExtractor.extractCanonicalSubplans;
+import static io.trino.sql.ir.Booleans.TRUE;
+import static io.trino.sql.ir.ExpressionFormatter.formatExpression;
+import static io.trino.sql.ir.IrUtils.and;
+import static io.trino.sql.ir.IrUtils.combineConjuncts;
+import static io.trino.sql.ir.IrUtils.combineDisjuncts;
+import static io.trino.sql.ir.IrUtils.extractConjuncts;
+import static io.trino.sql.ir.IrUtils.or;
+import static io.trino.sql.planner.iterative.rule.ExtractCommonPredicatesExpressionRewriter.extractCommonPredicates;
+import static io.trino.sql.planner.iterative.rule.NormalizeOrExpressionRewriter.normalizeOrExpression;
+import static io.trino.sql.planner.iterative.rule.PushPredicateIntoTableScan.pushFilterIntoTableScan;
+import static io.trino.sql.planner.plan.AggregationNode.Step.PARTIAL;
+import static io.trino.sql.planner.plan.AggregationNode.singleGroupingSet;
+import static java.lang.String.format;
+import static java.util.Comparator.comparing;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Predicate.not;
+
+/**
+ * Identifies common subqueries and provides adaptation to original query plan. Result of common
+ * subquery evaluation is cached with {@link CacheManager}. Therefore, IO and computations are
+ * performed only once and are reused within query execution.
+ *
+ * The general idea is that if there are two subqueries, e.g:
+ * {@code subquery1: table_scan(table) <- filter(col1 = 1) <- projection(y := col2 + 1)}
+ * {@code subquery2: table_scan(table) <- filter(col1 = 2) <- projection(z := col2 * 2)}
+ *
+ * Then such subqueries can be transformed into:
+ * {@code subquery1: table_scan(table) <- filter(col1 = 1 OR col1 = 2) <- projection(y := col2 + 1, z := col2 * 2)
+ * <- filter(col1 = 1) <- projection(y := y)}
+ * {@code subquery2: table_scan(table) <- filter(col1 = 1 OR col1 = 2) <- projection(y := col2 + 1, z := col2 * 2)
+ * <- filter(col1 = 2) <- projection(z := z)}
+ *
+ * {@code where: table_scan(table) <- filter(col1 = 1 OR col1 = 2) <- projection(y := col2 + 1, z := col2 * 2)}
+ * is a common subquery for which the results can be cached and evaluated only once.
+ */
+public final class CommonSubqueriesExtractor
+{
+ private final CacheController cacheController;
+ private final PlannerContext plannerContext;
+ private final Session session;
+ private final PlanNodeIdAllocator idAllocator;
+ private final SymbolAllocator symbolAllocator;
+ private final PlanNode root;
+ private final DomainTranslator domainTranslator;
+
+ public static Map extractCommonSubqueries(
+ CacheController cacheController,
+ PlannerContext plannerContext,
+ Session session,
+ PlanNodeIdAllocator idAllocator,
+ SymbolAllocator symbolAllocator,
+ PlanNode root)
+ {
+ return new CommonSubqueriesExtractor(cacheController, plannerContext, session, idAllocator, symbolAllocator, root)
+ .extractCommonSubqueries();
+ }
+
+ public CommonSubqueriesExtractor(
+ CacheController cacheController,
+ PlannerContext plannerContext,
+ Session session,
+ PlanNodeIdAllocator idAllocator,
+ SymbolAllocator symbolAllocator,
+ PlanNode root)
+ {
+ this.cacheController = requireNonNull(cacheController, "cacheController is null");
+ this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
+ this.session = requireNonNull(session, "session is null");
+ this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+ this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null");
+ this.root = requireNonNull(root, "root is null");
+ this.domainTranslator = new DomainTranslator(plannerContext.getMetadata());
+ }
+
+ public Map extractCommonSubqueries()
+ {
+ ImmutableMap.Builder planAdaptations = ImmutableMap.builder();
+ List cacheCandidates = cacheController.getCachingCandidates(
+ session,
+ extractCanonicalSubplans(plannerContext, session, root));
+
+ // extract common subplan adaptations
+ Set processedSubplans = new HashSet<>();
+ for (CacheCandidate cacheCandidate : cacheCandidates) {
+ List subplans = cacheCandidate.subplans().stream()
+ // skip subqueries for which common subplan was already extracted
+ .filter(subplan -> !processedSubplans.contains(subplan.getTableScanId()))
+ .collect(toImmutableList());
+
+ if (subplans.size() < cacheCandidate.minSubplans()) {
+ // skip if not enough subplans
+ continue;
+ }
+
+ subplans.forEach(subplan -> processedSubplans.add(subplan.getTableScanId()));
+ List adaptations = adaptSubplans(subplans);
+ checkState(adaptations.size() == subplans.size());
+ forEachPair(subplans.stream(), adaptations.stream(), (subplan, adaptation) ->
+ planAdaptations.put(subplan.getOriginalPlanNode(), adaptation));
+ }
+ return planAdaptations.buildOrThrow();
+ }
+
+ private List adaptSubplans(List