Skip to content

Commit

Permalink
[Enhancement] Support alter materialized view to active (StarRocks#24001
Browse files Browse the repository at this point in the history
)

Fixes StarRocks#23304

---------

Signed-off-by: Astralidea <[email protected]>
  • Loading branch information
Astralidea authored and abc982627271 committed Jun 5, 2023
1 parent 205d6d5 commit 26fdf28
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 115 deletions.
78 changes: 78 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.analysis.IntLiteral;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.TableRef;
import com.starrocks.authentication.AuthenticationManager;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.ColocateTableIndex;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DataProperty;
Expand Down Expand Up @@ -68,6 +71,7 @@
import com.starrocks.common.UserException;
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.persist.AlterMaterializedViewStatusLog;
import com.starrocks.persist.AlterViewInfo;
import com.starrocks.persist.BatchModifyPartitionsInfo;
import com.starrocks.persist.ChangeMaterializedViewRefreshSchemeLog;
Expand All @@ -79,6 +83,7 @@
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockWriter;
import com.starrocks.privilege.PrivilegeBuiltinConstants;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.ShowResultSet;
import com.starrocks.scheduler.Constants;
Expand All @@ -87,6 +92,9 @@
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.mv.MVManager;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.analyzer.SetStmtAnalyzer;
import com.starrocks.sql.ast.AddPartitionClause;
import com.starrocks.sql.ast.AlterClause;
Expand All @@ -106,17 +114,21 @@
import com.starrocks.sql.ast.ModifyPartitionClause;
import com.starrocks.sql.ast.ModifyTablePropertiesClause;
import com.starrocks.sql.ast.PartitionRenameClause;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.RefreshSchemeDesc;
import com.starrocks.sql.ast.ReplacePartitionClause;
import com.starrocks.sql.ast.RollupRenameClause;
import com.starrocks.sql.ast.SetListItem;
import com.starrocks.sql.ast.SetStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.SwapTableClause;
import com.starrocks.sql.ast.SystemVariable;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.ast.TruncatePartitionClause;
import com.starrocks.sql.ast.TruncateTableStmt;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TTabletMetaType;
import com.starrocks.thrift.TTabletType;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -264,6 +276,7 @@ public void processAlterMaterializedView(AlterMaterializedViewStmt stmt)
final String oldMvName = mvName.getTbl();
final String newMvName = stmt.getNewMvName();
final RefreshSchemeDesc refreshSchemeDesc = stmt.getRefreshSchemeDesc();
final String status = stmt.getStatus();
ModifyTablePropertiesClause modifyTablePropertiesClause = stmt.getModifyTablePropertiesClause();
String dbName = mvName.getDb();
Database db = GlobalStateMgr.getCurrentState().getDb(dbName);
Expand Down Expand Up @@ -303,6 +316,29 @@ public void processAlterMaterializedView(AlterMaterializedViewStmt stmt)
} catch (AnalysisException ae) {
throw new DdlException(ae.getMessage());
}
} else if (status != null) {
if (AlterMaterializedViewStmt.ACTIVE.equalsIgnoreCase(status)) {
if (materializedView.isActive()) {
return;
}
processChangeMaterializedViewStatus(materializedView, status);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.refreshMaterializedView(dbName, materializedView.getName(), true, null,
Constants.TaskRunPriority.NORMAL.value(), true, false);
} else if (AlterMaterializedViewStmt.INACTIVE.equalsIgnoreCase(status)) {
if (!materializedView.isActive()) {
return;
}
LOG.warn("Setting the materialized view {}({}) to inactive because " +
"user use alter materialized view set status to inactive",
materializedView.getName(), materializedView.getId());
processChangeMaterializedViewStatus(materializedView, status);
} else {
throw new DdlException("Unsupported modification materialized view status:" + status);
}
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(materializedView.getDbId(),
materializedView.getId(), status);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
} else {
throw new DdlException("Unsupported modification for materialized view");
}
Expand All @@ -313,6 +349,48 @@ public void processAlterMaterializedView(AlterMaterializedViewStmt stmt)
}
}

private void processChangeMaterializedViewStatus(MaterializedView materializedView, String status) {
if (AlterMaterializedViewStmt.ACTIVE.equalsIgnoreCase(status)) {
String viewDefineSql = materializedView.getViewDefineSql();
ConnectContext context = new ConnectContext();
context.setQualifiedUser(AuthenticationManager.ROOT_USER);
context.setCurrentUserIdentity(UserIdentity.ROOT);
context.setCurrentRoleIds(Sets.newHashSet(PrivilegeBuiltinConstants.ROOT_ROLE_ID));

List<StatementBase> statementBaseList = SqlParser.parse(viewDefineSql, context.getSessionVariable());
QueryStatement queryStatement = (QueryStatement) statementBaseList.get(0);
try {
Analyzer.analyze(queryStatement, context);
} catch (SemanticException e) {
throw new SemanticException("Can not active materialized view [" + materializedView.getName() +
"] because analyze materialized view define sql: \n\n" + viewDefineSql +
"\n\nCause an error: " + e.getDetailMsg());
}

List<BaseTableInfo> baseTableInfos = MaterializedViewAnalyzer.getAndCheckBaseTables(queryStatement);
materializedView.setBaseTableInfos(baseTableInfos);
materializedView.getRefreshScheme().getAsyncRefreshContext().clearVisibleVersionMap();
GlobalStateMgr.getCurrentState().updateBaseTableRelatedMv(materializedView.getDbId(),
materializedView, baseTableInfos);
materializedView.setActive(true);
} else if (AlterMaterializedViewStmt.INACTIVE.equalsIgnoreCase(status)) {
materializedView.setInactiveAndReason("user use alter materialized view set status to inactive");
}
}

public void replayAlterMaterializedViewStatus(AlterMaterializedViewStatusLog log) {
long dbId = log.getDbId();
long tableId = log.getTableId();
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
db.writeLock();
try {
MaterializedView mv = (MaterializedView) db.getTable(tableId);
processChangeMaterializedViewStatus(mv, log.getStatus());
} finally {
db.writeUnlock();
}
}

private void processModifyTableProperties(ModifyTablePropertiesClause modifyTablePropertiesClause,
Database db,
MaterializedView materializedView) throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public Map<BaseTableInfo, Map<String, BasePartitionInfo>> getBaseTableInfoVisibl
return baseTableInfoVisibleVersionMap;
}

public void clearVisibleVersionMap() {
this.baseTableInfoVisibleVersionMap.clear();
this.baseTableVisibleVersionMap.clear();
}

public boolean isDefineStartTime() {
return defineStartTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.starrocks.persist.AddPartitionsInfo;
import com.starrocks.persist.AddPartitionsInfoV2;
import com.starrocks.persist.AlterLoadJobOperationLog;
import com.starrocks.persist.AlterMaterializedViewStatusLog;
import com.starrocks.persist.AlterRoutineLoadJobOperationLog;
import com.starrocks.persist.AlterUserInfo;
import com.starrocks.persist.AlterViewInfo;
Expand Down Expand Up @@ -323,6 +324,10 @@ public void readFields(DataInput in) throws IOException {
data = RenameMaterializedViewLog.read(in);
isRead = true;
break;
case OperationType.OP_ALTER_MATERIALIZED_VIEW_STATUS:
data = AlterMaterializedViewStatusLog.read(in);
isRead = true;
break;
case OperationType.OP_BACKUP_JOB: {
data = AbstractJob.read(in);
isRead = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.starrocks.persist;

import com.google.gson.annotations.SerializedName;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class AlterMaterializedViewStatusLog implements Writable {

@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "status")
private String status;

public AlterMaterializedViewStatusLog(long dbId, long tableId, String status) {
this.dbId = dbId;
this.tableId = tableId;
this.status = status;
}

public long getDbId() {
return dbId;
}

public void setDbId(long dbId) {
this.dbId = dbId;
}

public long getTableId() {
return tableId;
}

public void setTableId(long tableId) {
this.tableId = tableId;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public static AlterMaterializedViewStatusLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AlterMaterializedViewStatusLog.class);
}

}
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ public static void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity jour
globalStateMgr.replayAlterMaterializedViewProperties(opCode, log);
break;
}
case OperationType.OP_ALTER_MATERIALIZED_VIEW_STATUS: {
AlterMaterializedViewStatusLog log =
(AlterMaterializedViewStatusLog) journal.getData();
globalStateMgr.replayAlterMaterializedViewStatus(log);
break;
}
case OperationType.OP_RENAME_MATERIALIZED_VIEW: {
RenameMaterializedViewLog log = (RenameMaterializedViewLog) journal.getData();
globalStateMgr.replayRenameMaterializedView(log);
Expand Down Expand Up @@ -1629,6 +1635,10 @@ public void logInsertOverwriteStateChange(InsertOverwriteStateChangeInfo info) {
logEdit(OperationType.OP_INSERT_OVERWRITE_STATE_CHANGE, info);
}

public void logAlterMvStatus(AlterMaterializedViewStatusLog log) {
logEdit(OperationType.OP_ALTER_MATERIALIZED_VIEW_STATUS, log);
}

public void logMvRename(RenameMaterializedViewLog log) {
logEdit(OperationType.OP_RENAME_MATERIALIZED_VIEW, log);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public class OperationType {
public static final short OP_CREATE_MATERIALIZED_VIEW = 10094;
public static final short OP_CREATE_INSERT_OVERWRITE = 10095;
public static final short OP_INSERT_OVERWRITE_STATE_CHANGE = 10096;
public static final short OP_ALTER_MATERIALIZED_VIEW_STATUS = 10097;

// manage system node info 10101 ~ 10120
public static final short OP_UPDATE_FRONTEND = 10101;
Expand Down
62 changes: 37 additions & 25 deletions fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import com.starrocks.mysql.privilege.Auth;
import com.starrocks.mysql.privilege.AuthUpgrader;
import com.starrocks.mysql.privilege.PrivPredicate;
import com.starrocks.persist.AlterMaterializedViewStatusLog;
import com.starrocks.persist.AuthUpgradeInfo;
import com.starrocks.persist.BackendIdsUpdateInfo;
import com.starrocks.persist.BackendTabletsInfo;
Expand Down Expand Up @@ -1566,38 +1567,43 @@ private void processMvRelatedMeta() {
for (String dbName : dbNames) {
Database db = metadataMgr.getDb(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, dbName);
for (MaterializedView mv : db.getMaterializedViews()) {
for (BaseTableInfo baseTableInfo : mv.getBaseTableInfos()) {
Table table = baseTableInfo.getTable();
if (table == null) {
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the table {} was not exist.", mv.getName(), mv.getId(), baseTableInfo.getTableName());
mv.setInactiveAndReason("base table dropped: " + baseTableInfo.getTableId());
continue;
}
if (table instanceof MaterializedView && !((MaterializedView) table).isActive()) {
MaterializedView baseMv = (MaterializedView) table;
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the materialized view{}({}) is invalid.", mv.getName(), mv.getId(),
baseMv.getName(), baseMv.getId());
mv.setInactiveAndReason("base mv is not active: " + baseMv.getName());
continue;
}
MvId mvId = new MvId(db.getId(), mv.getId());
table.addRelatedMaterializedView(mvId);
if (!table.isNativeTableOrMaterializedView()) {
connectorTblMetaInfoMgr.addConnectorTableInfo(baseTableInfo.getCatalogName(),
baseTableInfo.getDbName(), baseTableInfo.getTableIdentifier(),
ConnectorTableInfo.builder().setRelatedMaterializedViews(
Sets.newHashSet(mvId)).build());
}
}
List<BaseTableInfo> baseTableInfos = mv.getBaseTableInfos();
updateBaseTableRelatedMv(db.getId(), mv, baseTableInfos);
}
}

long duration = System.currentTimeMillis() - startMillis;
LOG.info("finish processing all tables' related materialized views in {}ms", duration);
}

public void updateBaseTableRelatedMv(Long dbId, MaterializedView mv, List<BaseTableInfo> baseTableInfos) {
for (BaseTableInfo baseTableInfo : baseTableInfos) {
Table table = baseTableInfo.getTable();
if (table == null) {
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the table {} was not exist.", mv.getName(), mv.getId(), baseTableInfo.getTableName());
mv.setInactiveAndReason("base table dropped: " + baseTableInfo.getTableId());
continue;
}
if (table instanceof MaterializedView && !((MaterializedView) table).isActive()) {
MaterializedView baseMv = (MaterializedView) table;
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the materialized view{}({}) is invalid.", mv.getName(), mv.getId(),
baseMv.getName(), baseMv.getId());
mv.setInactiveAndReason("base mv is not active: " + baseMv.getName());
continue;
}
MvId mvId = new MvId(dbId, mv.getId());
table.addRelatedMaterializedView(mvId);
if (!table.isNativeTableOrMaterializedView()) {
connectorTblMetaInfoMgr.addConnectorTableInfo(baseTableInfo.getCatalogName(),
baseTableInfo.getDbName(), baseTableInfo.getTableIdentifier(),
ConnectorTableInfo.builder().setRelatedMaterializedViews(
Sets.newHashSet(mvId)).build());
}
}
}

public long loadVersion(DataInputStream dis, long checksum) throws IOException {
// for new format, version schema is [starrocksMetaVersion], and the int value must be positive
// for old format, version schema is [-1, metaVersion, starrocksMetaVersion]
Expand Down Expand Up @@ -3291,6 +3297,12 @@ public void replayAlterMaterializedViewProperties(short opCode, ModifyTablePrope
this.alterJobMgr.replayAlterMaterializedViewProperties(opCode, log);
}


public void replayAlterMaterializedViewStatus(AlterMaterializedViewStatusLog log) {
this.alterJobMgr.replayAlterMaterializedViewStatus(log);
}


/*
* used for handling CancelAlterStmt (for client is the CANCEL ALTER
* command). including SchemaChangeHandler and RollupHandler
Expand Down
Loading

0 comments on commit 26fdf28

Please sign in to comment.