Skip to content

Commit

Permalink
[Feature] Support write quorum property for table
Browse files Browse the repository at this point in the history
1. MAJORITY: write majority replica successfully returns success, default option
2. ONE: write one replica successfully returns success
3. ALL: write all replica successfully returns success
  • Loading branch information
meegoo committed Oct 11, 2022
1 parent 9aeaf02 commit 896e393
Show file tree
Hide file tree
Showing 40 changed files with 419 additions and 54 deletions.
4 changes: 3 additions & 1 deletion be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ void run_update_meta_info_task(std::shared_ptr<UpdateTabletMetaInfoAgentTaskRequ
case TTabletMetaType::INMEMORY:
// This property is no longer supported.
break;
case TTabletMetaType::WRITE_QUORUM:
break;
case TTabletMetaType::ENABLE_PERSISTENT_INDEX:
LOG(INFO) << "update tablet:" << tablet->tablet_id()
<< " enable_persistent_index:" << tablet_meta_info.enable_persistent_index;
Expand All @@ -694,4 +696,4 @@ void run_update_meta_info_task(std::shared_ptr<UpdateTabletMetaInfoAgentTaskRequ
unify_finish_agent_task(status_code, error_msgs, agent_task_req->task_type, agent_task_req->signature);
}

} // namespace starrocks
} // namespace starrocks
19 changes: 18 additions & 1 deletion be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ Status NodeChannel::init(RuntimeState* state) {
_add_batch_closures.emplace_back(closure);
}

if (_parent->_write_quorum_type == TWriteQuorumType::ONE) {
_write_quorum_type = WriteQuorumTypePB::ONE;
} else if (_parent->_write_quorum_type == TWriteQuorumType::ALL) {
_write_quorum_type = WriteQuorumTypePB::ALL;
}

// for get global_dict
_runtime_state = state;

Expand Down Expand Up @@ -163,6 +169,7 @@ void NodeChannel::_open(int64_t index_id, RefCountClosure<PTabletWriterOpenResul
request.set_is_lake_tablet(_parent->_is_lake_table);
request.set_is_replicated_storage(_parent->_enable_replicated_storage);
request.set_node_id(_node_id);
request.set_write_quorum(_write_quorum_type);
for (auto& tablet : _index_tablets_map[index_id]) {
auto ptablet = request.add_tablets();
ptablet->Swap(&tablet);
Expand Down Expand Up @@ -721,11 +728,18 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<PTabletWithPart
for (auto& it : _node_channels) {
RETURN_IF_ERROR(it.second->init(state));
}
_write_quorum_type = _parent->_write_quorum_type;
return Status::OK();
}

bool IndexChannel::has_intolerable_failure() {
return _failed_channels.size() >= ((_parent->_num_repicas + 1) / 2);
if (_write_quorum_type == TWriteQuorumType::ALL) {
return _failed_channels.size() > 0;
} else if (_write_quorum_type == TWriteQuorumType::ONE) {
return _failed_channels.size() >= _parent->_num_repicas;
} else {
return _failed_channels.size() >= ((_parent->_num_repicas + 1) / 2);
}
}

OlapTableSink::OlapTableSink(ObjectPool* pool, const std::vector<TExpr>& texprs, Status* status) : _pool(pool) {
Expand All @@ -747,6 +761,9 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
_tuple_desc_id = table_sink.tuple_id;
_is_lake_table = table_sink.is_lake_table;
_keys_type = table_sink.keys_type;
if (table_sink.__isset.write_quorum_type) {
_write_quorum_type = table_sink.write_quorum_type;
}
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_vectorized_partition = _pool->add(new vectorized::OlapTablePartitionParam(_schema, table_sink.partition));
Expand Down
16 changes: 15 additions & 1 deletion be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class NodeChannel {
RuntimeState* _runtime_state = nullptr;

bool _enable_colocate_mv_index = config::enable_load_colocate_mv;

WriteQuorumTypePB _write_quorum_type = WriteQuorumTypePB::MAJORITY;
};

class IndexChannel {
Expand Down Expand Up @@ -231,6 +233,8 @@ class IndexChannel {
std::unordered_map<int64_t, int64_t> _be_to_tablet_num;
// BeId
std::set<int64_t> _failed_channels;

TWriteQuorumType::type _write_quorum_type = TWriteQuorumType::MAJORITY;
};

// Write data to Olap Table.
Expand Down Expand Up @@ -305,7 +309,15 @@ class OlapTableSink : public DataSink {

void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); }
bool is_failed_channel(const NodeChannel* ch) { return _failed_channels.count(ch->node_id()) != 0; }
bool has_intolerable_failure() { return _failed_channels.size() >= ((_num_repicas + 1) / 2); }
bool has_intolerable_failure() {
if (_write_quorum_type == TWriteQuorumType::ALL) {
return _failed_channels.size() > 0;
} else if (_write_quorum_type == TWriteQuorumType::ONE) {
return _failed_channels.size() >= _num_repicas;
} else {
return _failed_channels.size() >= ((_num_repicas + 1) / 2);
}
}

void for_each_node_channel(const std::function<void(NodeChannel*)>& func) {
for (auto& it : _node_channels) {
Expand Down Expand Up @@ -416,6 +428,8 @@ class OlapTableSink : public DataSink {
bool _colocate_mv_index = config::enable_load_colocate_mv;

bool _enable_replicated_storage = false;

TWriteQuorumType::type _write_quorum_type = TWriteQuorumType::MAJORITY;
};

} // namespace stream_load
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
options.node_id = _node_id;
options.timeout_ms = params.timeout_ms();
options.is_replicated_storage = params.is_replicated_storage();
options.write_quorum = params.write_quorum();
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct DeltaWriterOptions {
bool is_replicated_storage = false;
std::vector<PNetworkAddress> replicas;
int64_t timeout_ms;
WriteQuorumTypePB write_quorum;
};

enum ReplicaState {
Expand Down
9 changes: 7 additions & 2 deletions be/src/storage/segment_replicate_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,13 @@ ReplicateToken::ReplicateToken(std::unique_ptr<ThreadPoolToken> replicate_pool_t
_replicate_channels.emplace_back(std::move(std::make_unique<ReplicateChannel>(
opt, opt->replicas[i].host(), opt->replicas[i].port(), opt->replicas[i].node_id())));
}
// default quorom policy
_max_fail_replica_num = opt->replicas.size() - (opt->replicas.size() / 2 + 1);
if (opt->write_quorum == WriteQuorumTypePB::ONE) {
_max_fail_replica_num = opt->replicas.size();
} else if (opt->write_quorum == WriteQuorumTypePB::ALL) {
_max_fail_replica_num = 0;
} else {
_max_fail_replica_num = opt->replicas.size() - (opt->replicas.size() / 2 + 1);
}
}

Status ReplicateToken::submit(std::unique_ptr<SegmentPB> segment, bool eos) {
Expand Down
11 changes: 7 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,21 +506,24 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
// currently, only in memory property and enable persistent index property could reach here
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX));
properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM));

OlapTable olapTable = (OlapTable) db.getTable(tableName);
if (olapTable.isLakeTable()) {
throw new DdlException("Lake table not support alter in_memory or enable_persistent_index");
throw new DdlException("Lake table not support alter in_memory or enable_persistent_index or write_quorum");
}

if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
((SchemaChangeHandler) schemaChangeHandler).updateTableMeta(db, tableName,
properties, TTabletMetaType.INMEMORY);
} else {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX)) {
((SchemaChangeHandler) schemaChangeHandler).updateTableMeta(db, tableName, properties,
TTabletMetaType.ENABLE_PERSISTENT_INDEX);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM)) {
((SchemaChangeHandler) schemaChangeHandler).updateTableMeta(db, tableName, properties,
TTabletMetaType.WRITE_QUORUM);
}
} else {
throw new DdlException("Invalid alter opertion: " + alterClause.getOpType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.ListComparator;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.WriteQuorum;
import com.starrocks.mysql.privilege.PrivPredicate;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.ShowResultSet;
Expand All @@ -86,6 +87,7 @@
import com.starrocks.thrift.TStorageFormat;
import com.starrocks.thrift.TTabletMetaType;
import com.starrocks.thrift.TTaskType;
import com.starrocks.thrift.TWriteQuorumType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1099,6 +1101,8 @@ public AlterJobV2 analyzeAndCreateJob(List<AlterClause> alterClauses, Database d
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)) {
GlobalStateMgr.getCurrentState().modifyTableReplicationNum(db, olapTable, properties);
return null;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM)) {
return null;
}
}

Expand Down Expand Up @@ -1213,13 +1217,22 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (metaValue == olapTable.enablePersistentIndex()) {
return;
}
} else if (metaType == TTabletMetaType.WRITE_QUORUM) {
TWriteQuorumType writeQuorum = WriteQuorum
.findTWriteQuorumByName(properties.get(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM));
if (writeQuorum == olapTable.writeQuorum()) {
return;
}
} else {
LOG.warn("meta type: {} does not support", metaType);
return;
}

for (Partition partition : partitions) {
updatePartitionTabletMeta(db, olapTable.getName(), partition.getName(), metaValue, metaType);
if (metaType == TTabletMetaType.INMEMORY ||
metaType == TTabletMetaType.ENABLE_PERSISTENT_INDEX) {
for (Partition partition : partitions) {
updatePartitionTabletMeta(db, olapTable.getName(), partition.getName(), metaValue, metaType);
}
}

db.writeLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ public void updateMeta(String dbName, TTableMeta meta, List<TBackendMeta> backen
tableProperty.buildStorageFormat();
tableProperty.buildInMemory();
tableProperty.buildDynamicProperty();
tableProperty.buildWriteQuorum();

indexes = null;
if (meta.isSetIndex_infos()) {
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.starrocks.thrift.TStorageType;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import com.starrocks.thrift.TWriteQuorumType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -1633,6 +1634,23 @@ public void setEnablePersistentIndex(boolean enablePersistentIndex) {
tableProperty.buildEnablePersistentIndex();
}

public TWriteQuorumType writeQuorum() {
if (tableProperty != null) {
return tableProperty.writeQuorum();
}
return TWriteQuorumType.MAJORITY;
}

public void setWriteQuorum(String writeQuorum) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty
.modifyTableProperties(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM,
writeQuorum);
tableProperty.buildWriteQuorum();
}

public void setStorageMedium(TStorageMedium storageMedium) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.TStorageMedium;
import com.starrocks.thrift.TTabletType;
import com.starrocks.thrift.TWriteQuorumType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -104,8 +105,14 @@ public void setDataProperty(long partitionId, DataProperty newDataProperty) {
idToDataProperty.put(partitionId, newDataProperty);
}

public int getQuorumNum(long partitionId) {
return getReplicationNum(partitionId) / 2 + 1;
public int getQuorumNum(long partitionId, TWriteQuorumType writeQuorum) {
if (writeQuorum == TWriteQuorumType.ALL) {
return getReplicationNum(partitionId);
} else if (writeQuorum == TWriteQuorumType.ONE) {
return 1;
} else {
return getReplicationNum(partitionId) / 2 + 1;
}
}

public short getReplicationNum(long partitionId) {
Expand Down
19 changes: 19 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.WriteQuorum;
import com.starrocks.lake.StorageInfo;
import com.starrocks.persist.OperationType;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.thrift.TCompressionType;
import com.starrocks.thrift.TStorageFormat;
import com.starrocks.thrift.TWriteQuorumType;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -73,6 +75,9 @@ public class TableProperty implements Writable, GsonPostProcessable {
// the default compression type of this table.
private TCompressionType compressionType = TCompressionType.LZ4_FRAME;

// the default write quorum
private TWriteQuorumType writeQuorum = TWriteQuorumType.MAJORITY;

// 1. This table has been deleted. if hasDelete is false, the BE segment must don't have deleteConditions.
// If hasDelete is true, the BE segment maybe have deleteConditions because compaction.
// 2. Before checkpoint, we relay delete job journal log to persist.
Expand Down Expand Up @@ -111,6 +116,9 @@ public TableProperty buildProperty(short opCode) {
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
buildEnablePersistentIndex();
break;
case OperationType.OP_MODIFY_WRITE_QUORUM:
buildWriteQuorum();
break;
default:
break;
}
Expand Down Expand Up @@ -151,6 +159,12 @@ public TableProperty buildCompressionType() {
return this;
}

public TableProperty buildWriteQuorum() {
writeQuorum = WriteQuorum
.findTWriteQuorumByName(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_WRITE_QUORUM,
WriteQuorum.MAJORITY));
return this;
}
public TableProperty buildEnablePersistentIndex() {
enablePersistentIndex = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX, "false"));
Expand Down Expand Up @@ -185,6 +199,10 @@ public boolean enablePersistentIndex() {
return enablePersistentIndex;
}

public TWriteQuorumType writeQuorum() {
return writeQuorum;
}

public TStorageFormat getStorageFormat() {
return storageFormat;
}
Expand Down Expand Up @@ -234,5 +252,6 @@ public void gsonPostProcess() throws IOException {
buildStorageFormat();
buildEnablePersistentIndex();
buildCompressionType();
buildWriteQuorum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class PropertyAnalyzer {

public static final String PROPERTIES_ENABLE_PERSISTENT_INDEX = "enable_persistent_index";

public static final String PROPERTIES_WRITE_QUORUM = "write_quorum";

public static final String PROPERTIES_TABLET_TYPE = "tablet_type";

public static final String PROPERTIES_STRICT_RANGE = "strict_range";
Expand Down Expand Up @@ -447,6 +449,22 @@ public static TCompressionType analyzeCompressionType(Map<String, String> proper
}
}

// analyzeWriteQuorum will parse the write quorum from properties
public static String analyzeWriteQuorum(Map<String, String> properties) throws AnalysisException {
String writeQuorum;
if (properties == null || !properties.containsKey(PROPERTIES_WRITE_QUORUM)) {
return WriteQuorum.MAJORITY;
}
writeQuorum = properties.get(PROPERTIES_WRITE_QUORUM);
properties.remove(PROPERTIES_WRITE_QUORUM);

if (WriteQuorum.findTWriteQuorumByName(writeQuorum) != null) {
return writeQuorum;
} else {
throw new AnalysisException("unknown write quorum: " + writeQuorum);
}
}

// analyze common boolean properties, such as "in_memory" = "false"
public static boolean analyzeBooleanProp(Map<String, String> properties, String propKey, boolean defaultVal) {
if (properties != null && properties.containsKey(propKey)) {
Expand Down
Loading

0 comments on commit 896e393

Please sign in to comment.