Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
// port.
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}

// Add credential context to the client call
Expand Down Expand Up @@ -282,6 +282,7 @@ public ContainerCommandResponseProto sendCommand(
}
for (DatanodeDetails dn : datanodeList) {
try {
request = reconstructRequestIfNeeded(request, dn);
futureHashMap.put(dn, sendCommandAsync(request, dn).getResponse());
} catch (InterruptedException e) {
LOG.error("Command execution was interrupted.");
Expand Down Expand Up @@ -313,6 +314,29 @@ public ContainerCommandResponseProto sendCommand(
return responseProtoHashMap;
}

/**
* @param request
* @param dn
* @param pipeline
* In case of getBlock for EC keys, it is required to set replicaIndex for
* every request with the replicaIndex for that DN for which the request is
* sent to. This method unpacks proto and reconstructs request after setting
* the replicaIndex field.
* @return new updated Request
*/
private ContainerCommandRequestProto reconstructRequestIfNeeded(
ContainerCommandRequestProto request, DatanodeDetails dn) {
boolean isEcRequest = pipeline.getReplicationConfig()
.getReplicationType() == HddsProtos.ReplicationType.EC;
if (request.hasGetBlock() && isEcRequest) {
ContainerProtos.GetBlockRequestProto gbr = request.getGetBlock();
request = request.toBuilder().setGetBlock(gbr.toBuilder().setBlockID(
gbr.getBlockID().toBuilder().setReplicaIndex(
pipeline.getReplicaIndex(dn)).build()).build()).build();
}
return request;
}

@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, List<Validator> validators)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
.get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf), trustManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;

import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys;

import static java.util.Collections.unmodifiableSortedSet;
Expand Down Expand Up @@ -332,7 +333,67 @@ private static void addDeprecatedKeys() {
new DeprecationDelta("ozone.scm.chunk.layout",
ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY),
new DeprecationDelta("hdds.datanode.replication.work.dir",
OZONE_CONTAINER_COPY_WORKDIR)
OZONE_CONTAINER_COPY_WORKDIR),
new DeprecationDelta("dfs.container.chunk.write.sync",
OzoneConfigKeys.HDDS_CONTAINER_CHUNK_WRITE_SYNC_KEY),
new DeprecationDelta("dfs.container.ipc",
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT),
new DeprecationDelta("dfs.container.ipc.random.port",
OzoneConfigKeys.HDDS_CONTAINER_IPC_RANDOM_PORT),
new DeprecationDelta("dfs.container.ratis.admin.port",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_ADMIN_PORT),
new DeprecationDelta("dfs.container.ratis.datanode.storage.dir",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR),
new DeprecationDelta("dfs.container.ratis.datastream.enabled",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED),
new DeprecationDelta("dfs.container.ratis.datastream.port",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_PORT),
new DeprecationDelta("dfs.container.ratis.datastream.random.port",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT),
new DeprecationDelta("dfs.container.ratis.enabled",
ScmConfigKeys.HDDS_CONTAINER_RATIS_ENABLED_KEY),
new DeprecationDelta("dfs.container.ratis.ipc",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT),
new DeprecationDelta("dfs.container.ratis.ipc.random.port",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_RANDOM_PORT),
new DeprecationDelta("dfs.container.ratis.leader.pending.bytes.limit",
ScmConfigKeys.HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT),
new DeprecationDelta("dfs.container.ratis.log.appender.queue.byte-limit",
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT),
new DeprecationDelta("dfs.container.ratis.log.appender.queue.num-elements",
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS),
new DeprecationDelta("dfs.container.ratis.log.purge.gap",
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_PURGE_GAP),
new DeprecationDelta("dfs.container.ratis.log.queue.byte-limit",
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT),
new DeprecationDelta("dfs.container.ratis.log.queue.num-elements",
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS),
new DeprecationDelta("dfs.container.ratis.num.container.op.executors",
ScmConfigKeys.HDDS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY),
new DeprecationDelta("dfs.container.ratis.num.write.chunk.threads.per.volume",
ScmConfigKeys.HDDS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME),
new DeprecationDelta("dfs.container.ratis.replication.level",
ScmConfigKeys.HDDS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY),
new DeprecationDelta("dfs.container.ratis.rpc.type",
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY),
new DeprecationDelta("dfs.container.ratis.segment.preallocated.size",
ScmConfigKeys.HDDS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY),
new DeprecationDelta("dfs.container.ratis.segment.size",
ScmConfigKeys.HDDS_CONTAINER_RATIS_SEGMENT_SIZE_KEY),
new DeprecationDelta("dfs.container.ratis.server.port",
OzoneConfigKeys.HDDS_CONTAINER_RATIS_SERVER_PORT),
new DeprecationDelta("dfs.container.ratis.statemachinedata.sync.retries",
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES),
new DeprecationDelta("dfs.container.ratis.statemachinedata.sync.timeout",
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT),
new DeprecationDelta("dfs.container.ratis.statemachine.max.pending.apply-transactions",
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS),
new DeprecationDelta("dfs.ratis.leader.election.minimum.timeout.duration",
ScmConfigKeys.HDDS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY),
new DeprecationDelta("dfs.ratis.server.retry-cache.timeout.duration",
ScmConfigKeys.HDDS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY),
new DeprecationDelta("dfs.ratis.snapshot.threshold",
ScmConfigKeys.HDDS_RATIS_SNAPSHOT_THRESHOLD_KEY)
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ public static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,

private static RpcType getRpcType(ConfigurationSource conf) {
return SupportedRpcType.valueOfIgnoreCase(conf.get(
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT));
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT));
}

public static BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,95 +41,95 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_DB_DIRS_PERMISSIONS =
"ozone.scm.db.dirs.permissions";

public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
public static final String HDDS_CONTAINER_RATIS_ENABLED_KEY
= "hdds.container.ratis.enabled";
public static final boolean HDDS_CONTAINER_RATIS_ENABLED_DEFAULT
= false;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
public static final String HDDS_CONTAINER_RATIS_RPC_TYPE_KEY
= "hdds.container.ratis.rpc.type";
public static final String HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= "GRPC";
public static final String
DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME
= "dfs.container.ratis.num.write.chunk.threads.per.volume";
HDDS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME
= "hdds.container.ratis.num.write.chunk.threads.per.volume";
public static final int
DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_DEFAULT
HDDS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_DEFAULT
= 10;
public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
= "dfs.container.ratis.replication.level";
public static final String HDDS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
= "hdds.container.ratis.replication.level";
public static final ReplicationLevel
DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY
= "dfs.container.ratis.num.container.op.executors";
public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT
HDDS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
public static final String HDDS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY
= "hdds.container.ratis.num.container.op.executors";
public static final int HDDS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT
= 10;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
public static final String HDDS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"hdds.container.ratis.segment.size";
public static final String HDDS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
"64MB";
public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
"dfs.container.ratis.segment.preallocated.size";
public static final String HDDS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
"hdds.container.ratis.segment.preallocated.size";
public static final String
DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = "4MB";
HDDS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = "4MB";
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT =
"dfs.container.ratis.statemachinedata.sync.timeout";
HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT =
"hdds.container.ratis.statemachinedata.sync.timeout";
public static final TimeDuration
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT =
HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT =
TimeDuration.valueOf(10, TimeUnit.SECONDS);
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES =
"dfs.container.ratis.statemachinedata.sync.retries";
HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES =
"hdds.container.ratis.statemachinedata.sync.retries";
public static final String
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS =
"dfs.container.ratis.statemachine.max.pending.apply-transactions";
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS =
"hdds.container.ratis.statemachine.max.pending.apply-transactions";
// The default value of maximum number of pending state machine apply
// transactions is kept same as default snapshot threshold.
public static final int
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT =
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT =
100000;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS =
"dfs.container.ratis.log.queue.num-elements";
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT =
public static final String HDDS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS =
"hdds.container.ratis.log.queue.num-elements";
public static final int HDDS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT =
1024;
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT =
"dfs.container.ratis.log.queue.byte-limit";
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT =
public static final String HDDS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT =
"hdds.container.ratis.log.queue.byte-limit";
public static final String HDDS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT =
"4GB";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
"dfs.container.ratis.log.appender.queue.num-elements";
HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
"hdds.container.ratis.log.appender.queue.num-elements";
public static final int
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1;
public static final String DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
"dfs.container.ratis.log.appender.queue.byte-limit";
HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1;
public static final String HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
"hdds.container.ratis.log.appender.queue.byte-limit";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
"dfs.container.ratis.log.purge.gap";
HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String HDDS_CONTAINER_RATIS_LOG_PURGE_GAP =
"hdds.container.ratis.log.purge.gap";
// TODO: Set to 1024 once RATIS issue around purge is fixed.
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
public static final int HDDS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
1000000;
public static final String DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT =
"dfs.container.ratis.leader.pending.bytes.limit";
public static final String HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT =
"hdds.container.ratis.leader.pending.bytes.limit";
public static final String
DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";
HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";

public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
"dfs.ratis.server.retry-cache.timeout.duration";
public static final String HDDS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
"hdds.ratis.server.retry-cache.timeout.duration";
public static final TimeDuration
DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT =
HDDS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(600000, TimeUnit.MILLISECONDS);
public static final String
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
"dfs.ratis.leader.election.minimum.timeout.duration";
HDDS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
"hdds.ratis.leader.election.minimum.timeout.duration";
public static final TimeDuration
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
HDDS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(5, TimeUnit.SECONDS);

public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"dfs.ratis.snapshot.threshold";
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;
public static final String HDDS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"hdds.ratis.snapshot.threshold";
public static final long HDDS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;

// TODO : this is copied from OzoneConsts, may need to move to a better place
public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
Expand Down
Loading