Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement][Cherry-pick] make mv preprocess more robust for 2.5 (#19551) #19774

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ private Set<String> getPartitionNamesToRefreshForPartitionedMv() {
Expr partitionExpr = getPartitionRefTableExprs().get(0);
Pair<Table, Column> partitionInfo = getPartitionTableAndColumn();
// if non-partition-by table has changed, should refresh all mv partitions
if (partitionInfo == null) {
// mark it inactive
setActive(false);
LOG.warn("mark mv:{} inactive for get partition info failed", name);
throw new RuntimeException(String.format("getting partition info failed for mv: %s", name));
}
Table partitionTable = partitionInfo.first;
boolean forceExternalTableQueryRewrite = isForceExternalTableQueryRewrite();
for (BaseTableInfo tableInfo : baseTableInfos) {
Expand Down Expand Up @@ -966,7 +972,10 @@ public Pair<Table, Column> getPartitionTableAndColumn() {
return Pair.create(table, table.getColumn(partitionSlotRef.getColumnName()));
}
}
return null;
String baseTableNames = baseTableInfos.stream()
.map(tableInfo -> tableInfo.getTable().getName()).collect(Collectors.joining(","));
throw new RuntimeException(
String.format("can not find partition info for mv:%s on base tables:%s", name, baseTableNames));
}

public MvRewriteContext getPlanContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,6 +42,7 @@
import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils.getMvPartialPartitionPredicates;

public class MvRewritePreprocessor {
private static final Logger LOG = LogManager.getLogger(MvRewritePreprocessor.class);
private final ConnectContext connectContext;
private final ColumnRefFactory queryColumnRefFactory;
private final OptimizerContext context;
Expand All @@ -64,77 +67,86 @@ public void prepareMvCandidatesForPlan() {

Set<ColumnRefOperator> originQueryColumns = Sets.newHashSet(queryColumnRefFactory.getColumnRefs());
for (MaterializedView mv : relatedMvs) {
if (!mv.isActive()) {
continue;
try {
preprocessMv(mv, queryTables, originQueryColumns);
} catch (Exception e) {
List<String> tableNames = queryTables.stream().map(Table::getName).collect(Collectors.toList());
LOG.warn("preprocess mv {} failed for query tables:{}", mv.getName(), tableNames, e);
}
}
}

MaterializedView.MvRewriteContext mvRewriteContext = mv.getPlanContext();
if (mvRewriteContext == null) {
// build mv query logical plan
MaterializedViewOptimizer mvOptimizer = new MaterializedViewOptimizer();
mvRewriteContext = mvOptimizer.optimize(mv, connectContext);
mv.setPlanContext(mvRewriteContext);
}
if (!mvRewriteContext.isValidMvPlan()) {
continue;
}
private void preprocessMv(MaterializedView mv, List<Table> queryTables, Set<ColumnRefOperator> originQueryColumns) {
if (!mv.isActive()) {
return;
}

Set<String> partitionNamesToRefresh = mv.getPartitionNamesToRefreshForMv();
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo instanceof SinglePartitionInfo) {
if (!partitionNamesToRefresh.isEmpty()) {
continue;
}
} else if (!mv.getPartitionNames().isEmpty() &&
partitionNamesToRefresh.containsAll(mv.getPartitionNames())) {
// if the mv is partitioned, and all partitions need refresh,
// then it can not be an candidate
continue;
}
MaterializedView.MvRewriteContext mvRewriteContext = mv.getPlanContext();
if (mvRewriteContext == null) {
// build mv query logical plan
MaterializedViewOptimizer mvOptimizer = new MaterializedViewOptimizer();
mvRewriteContext = mvOptimizer.optimize(mv, connectContext);
mv.setPlanContext(mvRewriteContext);
}
if (!mvRewriteContext.isValidMvPlan()) {
return;
}

OptExpression mvPlan = mvRewriteContext.getLogicalPlan();
ScalarOperator mvPartialPartitionPredicates = null;
if (mv.getPartitionInfo() instanceof ExpressionRangePartitionInfo && !partitionNamesToRefresh.isEmpty()) {
// when mv is partitioned and there are some refreshed partitions,
// when should calculate latest partition range predicates for partition-by base table
mvPartialPartitionPredicates = getMvPartialPartitionPredicates(mv, mvPlan, partitionNamesToRefresh);
if (mvPartialPartitionPredicates == null) {
continue;
}
Set<String> partitionNamesToRefresh = mv.getPartitionNamesToRefreshForMv();
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo instanceof SinglePartitionInfo) {
if (!partitionNamesToRefresh.isEmpty()) {
return;
}
} else if (!mv.getPartitionNames().isEmpty() &&
partitionNamesToRefresh.containsAll(mv.getPartitionNames())) {
// if the mv is partitioned, and all partitions need refresh,
// then it can not be an candidate
return;
}

List<Table> baseTables = MvUtils.getAllTables(mvPlan);
List<Table> intersectingTables = baseTables.stream().filter(queryTables::contains).collect(Collectors.toList());
MaterializationContext materializationContext =
new MaterializationContext(context, mv, mvPlan, queryColumnRefFactory,
mv.getPlanContext().getRefFactory(), partitionNamesToRefresh,
baseTables, originQueryColumns, intersectingTables, mvPartialPartitionPredicates);
List<ColumnRefOperator> mvOutputColumns = mv.getPlanContext().getOutputColumns();
// generate scan mv plan here to reuse it in rule applications
LogicalOlapScanOperator scanMvOp = createScanMvOperator(materializationContext);
materializationContext.setScanMvOperator(scanMvOp);
String dbName = connectContext.getGlobalStateMgr().getDb(mv.getDbId()).getFullName();
connectContext.getDumpInfo().addTable(dbName, mv);
// should keep the sequence of schema
List<ColumnRefOperator> scanMvOutputColumns = Lists.newArrayList();
for (Column column : mv.getFullSchema()) {
scanMvOutputColumns.add(scanMvOp.getColumnReference(column));
OptExpression mvPlan = mvRewriteContext.getLogicalPlan();
ScalarOperator mvPartialPartitionPredicates = null;
if (mv.getPartitionInfo() instanceof ExpressionRangePartitionInfo && !partitionNamesToRefresh.isEmpty()) {
// when mv is partitioned and there are some refreshed partitions,
// when should calculate latest partition range predicates for partition-by base table
mvPartialPartitionPredicates = getMvPartialPartitionPredicates(mv, mvPlan, partitionNamesToRefresh);
if (mvPartialPartitionPredicates == null) {
return;
}
Preconditions.checkState(mvOutputColumns.size() == scanMvOutputColumns.size());

// construct output column mapping from mv sql to mv scan operator
// eg: for mv1 sql define: select a, (b + 1) as c2, (a * b) as c3 from table;
// select sql plan output columns: a, b + 1, a * b
// | | |
// v v V
// mv scan operator output columns: a, c2, c3
Map<ColumnRefOperator, ColumnRefOperator> outputMapping = Maps.newHashMap();
for (int i = 0; i < mvOutputColumns.size(); i++) {
outputMapping.put(mvOutputColumns.get(i), scanMvOutputColumns.get(i));
}
materializationContext.setOutputMapping(outputMapping);
context.addCandidateMvs(materializationContext);
}

List<Table> baseTables = MvUtils.getAllTables(mvPlan);
List<Table> intersectingTables = baseTables.stream().filter(queryTables::contains).collect(Collectors.toList());
MaterializationContext materializationContext =
new MaterializationContext(context, mv, mvPlan, queryColumnRefFactory,
mv.getPlanContext().getRefFactory(), partitionNamesToRefresh,
baseTables, originQueryColumns, intersectingTables, mvPartialPartitionPredicates);
List<ColumnRefOperator> mvOutputColumns = mv.getPlanContext().getOutputColumns();
// generate scan mv plan here to reuse it in rule applications
LogicalOlapScanOperator scanMvOp = createScanMvOperator(materializationContext);
materializationContext.setScanMvOperator(scanMvOp);
String dbName = connectContext.getGlobalStateMgr().getDb(mv.getDbId()).getFullName();
connectContext.getDumpInfo().addTable(dbName, mv);
// should keep the sequence of schema
List<ColumnRefOperator> scanMvOutputColumns = Lists.newArrayList();
for (Column column : mv.getFullSchema()) {
scanMvOutputColumns.add(scanMvOp.getColumnReference(column));
}
Preconditions.checkState(mvOutputColumns.size() == scanMvOutputColumns.size());

// construct output column mapping from mv sql to mv scan operator
// eg: for mv1 sql define: select a, (b + 1) as c2, (a * b) as c3 from table;
// select sql plan output columns: a, b + 1, a * b
// | | |
// v v V
// mv scan operator output columns: a, c2, c3
Map<ColumnRefOperator, ColumnRefOperator> outputMapping = Maps.newHashMap();
for (int i = 0; i < mvOutputColumns.size(); i++) {
outputMapping.put(mvOutputColumns.get(i), scanMvOutputColumns.get(i));
}
materializationContext.setOutputMapping(outputMapping);
context.addCandidateMvs(materializationContext);
}

/**
Expand Down