Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,9 @@ public void stop() {
public static RaftProperties newRaftProperties(ConfigurationSource conf,
int port, String ratisStorageDir) {
// Set RPC type
final String rpcType = conf.get(
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT));
final RaftProperties properties = RatisHelper.newRaftProperties(rpc);

// Set the ratis port number
Expand All @@ -617,8 +616,7 @@ public static RaftProperties newRaftProperties(ConfigurationSource conf,
}

// Set Ratis storage directory
RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(new File(ratisStorageDir)));
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(new File(ratisStorageDir)));

final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
Expand All @@ -645,128 +643,93 @@ public static RaftProperties newRaftProperties(ConfigurationSource conf,

private static void setRaftLeaderElectionProperties(RaftProperties properties, ConfigurationSource conf) {
// Disable/enable the pre vote feature in Ratis
RaftServerConfigKeys.LeaderElection.setPreVote(properties,
conf.getBoolean(OMConfigKeys.OZONE_OM_RATIS_SERVER_ELECTION_PRE_VOTE,
OMConfigKeys.OZONE_OM_RATIS_SERVER_ELECTION_PRE_VOTE_DEFAULT));
RaftServerConfigKeys.LeaderElection.setPreVote(properties, conf.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_SERVER_ELECTION_PRE_VOTE,
OMConfigKeys.OZONE_OM_RATIS_SERVER_ELECTION_PRE_VOTE_DEFAULT));
}

private static void setRaftLogProperties(RaftProperties properties,
int logAppenderQueueByteLimit, ConfigurationSource conf) {
// Set RAFT segment size
final long raftSegmentSize = (long) conf.getStorageSize(
RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf((long) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
SizeInBytes.valueOf(raftSegmentSize));
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_DEFAULT, StorageUnit.BYTES)));

// Set to enable RAFT to purge logs up to Snapshot Index
RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties,
conf.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_UPTO_SNAPSHOT_INDEX,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_UPTO_SNAPSHOT_INDEX_DEFAULT
)
);
RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, conf.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_UPTO_SNAPSHOT_INDEX,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_UPTO_SNAPSHOT_INDEX_DEFAULT));

// Set number of last RAFT logs to not be purged
RaftServerConfigKeys.Log.setPurgePreservationLogNum(properties,
conf.getLong(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_PRESERVATION_LOG_NUM,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_PRESERVATION_LOG_NUM_DEFAULT
)
);
RaftServerConfigKeys.Log.setPurgePreservationLogNum(properties, conf.getLong(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_PRESERVATION_LOG_NUM,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_PRESERVATION_LOG_NUM_DEFAULT));

// Set RAFT segment pre-allocated size
final long raftSegmentPreallocatedSize = (long) conf.getStorageSize(
RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf((long) conf.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
StorageUnit.BYTES);
int logAppenderQueueNumElements = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT, StorageUnit.BYTES)));

// Set RAFT buffer element limit
RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties, conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setWriteBufferSize(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit + 8));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
false);
final int logPurgeGap = conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT));

RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setWriteBufferSize(properties, SizeInBytes.valueOf(logAppenderQueueByteLimit + 8));
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, false);

RaftServerConfigKeys.Log.setPurgeGap(properties, conf.getInt(
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT));

// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
}

private static void setGrpcConfig(RaftProperties properties, int logAppenderQueueByteLimit) {
// For grpc set the maximum message size
// TODO: calculate the optimal max message size
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(logAppenderQueueByteLimit));
}

private static void setRaftRpcProperties(RaftProperties properties, ConfigurationSource conf) {
// Set the server request timeout
TimeUnit serverRequestTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
long serverRequestTimeoutDuration = conf.getTimeDuration(
TimeUnit serverRequestTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
final TimeDuration serverRequestTimeout = TimeDuration.valueOf(conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
.getDuration(), serverRequestTimeoutUnit);
final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
serverRequestTimeoutDuration, serverRequestTimeoutUnit);
RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
serverRequestTimeout);
OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getDuration(), serverRequestTimeoutUnit),
serverRequestTimeoutUnit);
RaftServerConfigKeys.Rpc.setRequestTimeout(properties, serverRequestTimeout);

// Set the server min and max timeout
TimeUnit serverMinTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
long serverMinTimeoutDuration = conf.getTimeDuration(
TimeUnit serverMinTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
final TimeDuration serverMinTimeout = TimeDuration.valueOf(conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
.getDuration(), serverMinTimeoutUnit);
final TimeDuration serverMinTimeout = TimeDuration.valueOf(
serverMinTimeoutDuration, serverMinTimeoutUnit);
long serverMaxTimeoutDuration =
serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
serverMaxTimeoutDuration, TimeUnit.MILLISECONDS);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
serverMinTimeout);
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
serverMaxTimeout);
OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getDuration(), serverMinTimeoutUnit),
serverMinTimeoutUnit);
final TimeDuration serverMaxTimeout = serverMinTimeout.add(200, TimeUnit.MILLISECONDS);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties, serverMinTimeout);
RaftServerConfigKeys.Rpc.setTimeoutMax(properties, serverMaxTimeout);

// Set the server Rpc slowness timeout and Notification noLeader timeout
TimeUnit nodeFailureTimeoutUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.getUnit();
long nodeFailureTimeoutDuration = conf.getTimeDuration(
TimeUnit nodeFailureTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.getUnit();
final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
.getDuration(), nodeFailureTimeoutUnit);
final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
nodeFailureTimeout);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);
OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.getDuration(), nodeFailureTimeoutUnit),
nodeFailureTimeoutUnit);
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties, nodeFailureTimeout);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, nodeFailureTimeout);
}

private static void setRaftRetryCacheProperties(RaftProperties properties, ConfigurationSource conf) {
// Set timeout for server retry cache entry
TimeUnit retryCacheTimeoutUnit = OMConfigKeys
.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
long retryCacheTimeoutDuration = conf.getTimeDuration(
TimeUnit retryCacheTimeoutUnit = OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
final TimeDuration retryCacheTimeout = TimeDuration.valueOf(conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
.getDuration(), retryCacheTimeoutUnit);
final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
retryCacheTimeoutDuration, retryCacheTimeoutUnit);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
retryCacheTimeout);
OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getDuration(), retryCacheTimeoutUnit),
retryCacheTimeoutUnit);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheTimeout);
}

private static void setRaftSnapshotProperties(RaftProperties properties, ConfigurationSource conf) {
Expand All @@ -779,15 +742,11 @@ private static void setRaftSnapshotProperties(RaftProperties properties, Configu
// The transaction info value in OM DB is used as
// snapshot value after restart.

RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
properties, true);
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);

long snapshotAutoTriggerThreshold = conf.getLong(
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, conf.getLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT);

RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties,
snapshotAutoTriggerThreshold);
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT));
}

private static void setRaftCloseThreshold(RaftProperties properties, ConfigurationSource conf) {
Expand Down