diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java b/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java index bf2d4f796491c..a8e22fa2f0ec3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java @@ -871,6 +871,12 @@ private Set getPartitionNamesToRefreshForPartitionedMv() { Expr partitionExpr = getPartitionRefTableExprs().get(0); Pair partitionInfo = getPartitionTableAndColumn(); // if non-partition-by table has changed, should refresh all mv partitions + if (partitionInfo == null) { + // mark it inactive + setActive(false); + LOG.warn("mark mv:{} inactive for get partition info failed", name); + throw new RuntimeException(String.format("getting partition info failed for mv: %s", name)); + } Table partitionTable = partitionInfo.first; boolean forceExternalTableQueryRewrite = isForceExternalTableQueryRewrite(); for (BaseTableInfo tableInfo : baseTableInfos) { @@ -966,7 +972,10 @@ public Pair getPartitionTableAndColumn() { return Pair.create(table, table.getColumn(partitionSlotRef.getColumnName())); } } - return null; + String baseTableNames = baseTableInfos.stream() + .map(tableInfo -> tableInfo.getTable().getName()).collect(Collectors.joining(",")); + throw new RuntimeException( + String.format("can not find partition info for mv:%s on base tables:%s", name, baseTableNames)); } public MvRewriteContext getPlanContext() { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java index b80f7028e47d8..fae349a3d8ebb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvRewritePreprocessor.java @@ -30,6 +30,8 @@ import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator; import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -40,6 +42,7 @@ import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.getMvPartialPartitionPredicates; public class MvRewritePreprocessor { + private static final Logger LOG = LogManager.getLogger(MvRewritePreprocessor.class); private final ConnectContext connectContext; private final ColumnRefFactory queryColumnRefFactory; private final OptimizerContext context; @@ -64,77 +67,86 @@ public void prepareMvCandidatesForPlan() { Set originQueryColumns = Sets.newHashSet(queryColumnRefFactory.getColumnRefs()); for (MaterializedView mv : relatedMvs) { - if (!mv.isActive()) { - continue; + try { + preprocessMv(mv, queryTables, originQueryColumns); + } catch (Exception e) { + List tableNames = queryTables.stream().map(Table::getName).collect(Collectors.toList()); + LOG.warn("preprocess mv {} failed for query tables:{}", mv.getName(), tableNames, e); } + } + } - MaterializedView.MvRewriteContext mvRewriteContext = mv.getPlanContext(); - if (mvRewriteContext == null) { - // build mv query logical plan - MaterializedViewOptimizer mvOptimizer = new MaterializedViewOptimizer(); - mvRewriteContext = mvOptimizer.optimize(mv, connectContext); - mv.setPlanContext(mvRewriteContext); - } - if (!mvRewriteContext.isValidMvPlan()) { - continue; - } + private void preprocessMv(MaterializedView mv, List queryTables, Set originQueryColumns) { + if (!mv.isActive()) { + return; + } - Set partitionNamesToRefresh = mv.getPartitionNamesToRefreshForMv(); - PartitionInfo partitionInfo = mv.getPartitionInfo(); - if (partitionInfo instanceof SinglePartitionInfo) { - if (!partitionNamesToRefresh.isEmpty()) { - continue; - } - } else if (!mv.getPartitionNames().isEmpty() && - partitionNamesToRefresh.containsAll(mv.getPartitionNames())) { - // if the mv is partitioned, and all partitions need refresh, - // then it can not be an candidate - continue; - } + MaterializedView.MvRewriteContext mvRewriteContext = mv.getPlanContext(); + if (mvRewriteContext == null) { + // build mv query logical plan + MaterializedViewOptimizer mvOptimizer = new MaterializedViewOptimizer(); + mvRewriteContext = mvOptimizer.optimize(mv, connectContext); + mv.setPlanContext(mvRewriteContext); + } + if (!mvRewriteContext.isValidMvPlan()) { + return; + } - OptExpression mvPlan = mvRewriteContext.getLogicalPlan(); - ScalarOperator mvPartialPartitionPredicates = null; - if (mv.getPartitionInfo() instanceof ExpressionRangePartitionInfo && !partitionNamesToRefresh.isEmpty()) { - // when mv is partitioned and there are some refreshed partitions, - // when should calculate latest partition range predicates for partition-by base table - mvPartialPartitionPredicates = getMvPartialPartitionPredicates(mv, mvPlan, partitionNamesToRefresh); - if (mvPartialPartitionPredicates == null) { - continue; - } + Set partitionNamesToRefresh = mv.getPartitionNamesToRefreshForMv(); + PartitionInfo partitionInfo = mv.getPartitionInfo(); + if (partitionInfo instanceof SinglePartitionInfo) { + if (!partitionNamesToRefresh.isEmpty()) { + return; } + } else if (!mv.getPartitionNames().isEmpty() && + partitionNamesToRefresh.containsAll(mv.getPartitionNames())) { + // if the mv is partitioned, and all partitions need refresh, + // then it can not be an candidate + return; + } - List
baseTables = MvUtils.getAllTables(mvPlan); - List
intersectingTables = baseTables.stream().filter(queryTables::contains).collect(Collectors.toList()); - MaterializationContext materializationContext = - new MaterializationContext(context, mv, mvPlan, queryColumnRefFactory, - mv.getPlanContext().getRefFactory(), partitionNamesToRefresh, - baseTables, originQueryColumns, intersectingTables, mvPartialPartitionPredicates); - List mvOutputColumns = mv.getPlanContext().getOutputColumns(); - // generate scan mv plan here to reuse it in rule applications - LogicalOlapScanOperator scanMvOp = createScanMvOperator(materializationContext); - materializationContext.setScanMvOperator(scanMvOp); - String dbName = connectContext.getGlobalStateMgr().getDb(mv.getDbId()).getFullName(); - connectContext.getDumpInfo().addTable(dbName, mv); - // should keep the sequence of schema - List scanMvOutputColumns = Lists.newArrayList(); - for (Column column : mv.getFullSchema()) { - scanMvOutputColumns.add(scanMvOp.getColumnReference(column)); + OptExpression mvPlan = mvRewriteContext.getLogicalPlan(); + ScalarOperator mvPartialPartitionPredicates = null; + if (mv.getPartitionInfo() instanceof ExpressionRangePartitionInfo && !partitionNamesToRefresh.isEmpty()) { + // when mv is partitioned and there are some refreshed partitions, + // when should calculate latest partition range predicates for partition-by base table + mvPartialPartitionPredicates = getMvPartialPartitionPredicates(mv, mvPlan, partitionNamesToRefresh); + if (mvPartialPartitionPredicates == null) { + return; } - Preconditions.checkState(mvOutputColumns.size() == scanMvOutputColumns.size()); - - // construct output column mapping from mv sql to mv scan operator - // eg: for mv1 sql define: select a, (b + 1) as c2, (a * b) as c3 from table; - // select sql plan output columns: a, b + 1, a * b - // | | | - // v v V - // mv scan operator output columns: a, c2, c3 - Map outputMapping = Maps.newHashMap(); - for (int i = 0; i < mvOutputColumns.size(); i++) { - outputMapping.put(mvOutputColumns.get(i), scanMvOutputColumns.get(i)); - } - materializationContext.setOutputMapping(outputMapping); - context.addCandidateMvs(materializationContext); } + + List
baseTables = MvUtils.getAllTables(mvPlan); + List
intersectingTables = baseTables.stream().filter(queryTables::contains).collect(Collectors.toList()); + MaterializationContext materializationContext = + new MaterializationContext(context, mv, mvPlan, queryColumnRefFactory, + mv.getPlanContext().getRefFactory(), partitionNamesToRefresh, + baseTables, originQueryColumns, intersectingTables, mvPartialPartitionPredicates); + List mvOutputColumns = mv.getPlanContext().getOutputColumns(); + // generate scan mv plan here to reuse it in rule applications + LogicalOlapScanOperator scanMvOp = createScanMvOperator(materializationContext); + materializationContext.setScanMvOperator(scanMvOp); + String dbName = connectContext.getGlobalStateMgr().getDb(mv.getDbId()).getFullName(); + connectContext.getDumpInfo().addTable(dbName, mv); + // should keep the sequence of schema + List scanMvOutputColumns = Lists.newArrayList(); + for (Column column : mv.getFullSchema()) { + scanMvOutputColumns.add(scanMvOp.getColumnReference(column)); + } + Preconditions.checkState(mvOutputColumns.size() == scanMvOutputColumns.size()); + + // construct output column mapping from mv sql to mv scan operator + // eg: for mv1 sql define: select a, (b + 1) as c2, (a * b) as c3 from table; + // select sql plan output columns: a, b + 1, a * b + // | | | + // v v V + // mv scan operator output columns: a, c2, c3 + Map outputMapping = Maps.newHashMap(); + for (int i = 0; i < mvOutputColumns.size(); i++) { + outputMapping.put(mvOutputColumns.get(i), scanMvOutputColumns.get(i)); + } + materializationContext.setOutputMapping(outputMapping); + context.addCandidateMvs(materializationContext); } /**