Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
c3986dc
HDDS-1156. testDelegationToken is failing in TestSecureOzoneCluster. …
ajayydv Mar 5, 2019
f4918ac
HDDS-623. On SCM UI, Node Manager info is empty (#523)
elek Mar 4, 2019
c57e1dd
HDDS-1218. Do the dist-layout-stitching for Ozone after the test-comp…
arp7 Mar 5, 2019
4cf50e3
HDDS-1193. Refactor ContainerChillModeRule and DatanodeChillMode rule…
bharatviswa504 Mar 5, 2019
6eb7981
HDDS-1171. Add benchmark for OM and OM client in Genesis.
anuengineer Mar 5, 2019
8d906d7
HDDS-919. Enable prometheus endpoints for Ozone datanodes (#502)
elek Mar 5, 2019
b99ae7b
HDDS-935. Avoid creating an already created container on a datanode i…
bshashikant Mar 5, 2019
8655abb
HDDS-1184. Parallelization of write chunks in datanodes is broken. Co…
bshashikant Mar 6, 2019
ab5da9a
HDDS-1219. TestContainerActionsHandler.testCloseContainerAction has a…
elek Mar 5, 2019
a50ffd8
HDDS-1222. Remove TestContainerSQLCli unit test stub. Contributed by …
elek Mar 5, 2019
5502da4
HDDS-1216. Change name of ozoneManager service in docker compose file…
Mar 6, 2019
9cd3fe4
HDDS-1113. Remove default dependencies from hadoop-ozone project. Con…
elek Feb 15, 2019
67a26ac
HDDS-1093. Configuration tab in OM/SCM ui is not displaying the corre…
vivekratnavel Mar 7, 2019
62ab342
HDDS-1226. ozone-filesystem jar missing in hadoop classpath
vivekratnavel Mar 7, 2019
c334bc8
HDDS-1208. ContainerStateMachine should set chunk data as state machi…
lokeshj1703 Mar 6, 2019
118986c
HDDS-1213. Support plain text S3 MPU initialization request.
elek Mar 8, 2019
6465125
HDDS-594. SCM CA: DN sends CSR and uses certificate issued by SCM. Co…
ajayydv Mar 7, 2019
4921797
HDDS-1235. BaseHttpServer NPE is HTTP policy is HTTPS_ONLY. Contribut…
xiaoyuyao Mar 9, 2019
e65717a
HDDS-1242. In S3 when bucket already exists, it should just return lo…
bharatviswa504 Mar 10, 2019
60b5e17
HDDS-1220. KeyManager#openKey should release the bucket lock before d…
lokeshj1703 Mar 11, 2019
aa9de9a
HDDS-1210. Ratis pipeline creation doesn't check raft client reply st…
mukul1987 Mar 6, 2019
5d67a17
HDDS-1238. Fix Ratis Snapshot creation error if the snapshot file alr…
mukul1987 Mar 9, 2019
89fb60f
HDDS-1236. Fix incorrect Ozone ClientProtocol KerberosInfo annotation…
xiaoyuyao Mar 11, 2019
fe41e67
Revert "HDDS-1236. Fix incorrect Ozone ClientProtocol KerberosInfo an…
xiaoyuyao Mar 11, 2019
1050bf4
HDDS-1236. Fix incorrect Ozone ClientProtocol KerberosInfo annotation…
xiaoyuyao Mar 11, 2019
6028aef
HDDS-596. Add robot test for OM Block Token. Contributed by Ajay Kumar.
ajayydv Mar 11, 2019
e673beb
HDDS-1245. OM delegation expiration time should use Time.now instead …
xiaoyuyao Mar 12, 2019
4373c12
HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contribute…
bshashikant Mar 11, 2019
97fd302
HDDS-1095. OzoneManager#openKey should do multiple block allocations …
mukul1987 Mar 12, 2019
262260f
HDDS-1253. Fix checkstyle issue from Nightly run. Contributed by Xiao…
xiaoyuyao Mar 12, 2019
15b6e38
HDDS-1043. Enable token based authentication for S3 api
ajayydv Mar 12, 2019
32477ed
HDDS-1226. Addendum. ozone-filesystem jar missing in hadoop classpath
vivekratnavel Mar 7, 2019
b5b3f4d
HDDS-1128. Create stateful manager class for the pipeline creation sc…
lokeshj1703 Mar 13, 2019
34f0ad0
HDDS-1209. Fix the block allocation logic in SCM when client wants to…
Mar 13, 2019
eeb7d63
HDDS-1087. Fix TestDefaultCertificateClient#testSignDataStream. Contr…
xiaoyuyao Mar 13, 2019
0f4ae39
HDDS-1254. Fix failure in TestOzoneManagerHttpServer & TestStorageCon…
ajayydv Mar 13, 2019
f38a8a9
HDDS-1241. Update ozone to latest ratis snapshot build (0.4.0-5680cf5…
mukul1987 Mar 14, 2019
8e66b62
HDDS-1237. Fix test TestSecureContainerServer.testClientServerRatisGr…
mukul1987 Mar 14, 2019
8614da3
HDDS-1257. Incorrect object because of mismatch in block lengths. Con…
bshashikant Mar 14, 2019
c00e704
HDDS-1265. ozone sh s3 getsecret throws Null Pointer Exception for un…
vivekratnavel Mar 15, 2019
64726dd
HDDS-761. Create S3 subcommand to run S3 related operations.
vivekratnavel Mar 15, 2019
8bd13ff
HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Sha…
bshashikant Mar 15, 2019
3fd9c0d
HDDS-807. Period should be an invalid character in bucket names. Cont…
elek Mar 12, 2019
522976e
HDDS-1259. OzoneFS classpath separation is broken by the token valida…
elek Mar 15, 2019
3ca1024
HDDS-1138. Ozone Client should avoid talking to SCM directly. Contrib…
xiaoyuyao Mar 15, 2019
a009255
HDDS-1283. Fix the dynamic documentation of basic s3 client usage. Co…
elek Mar 15, 2019
0d35cfc
HDDS-1284. Adjust default values of pipline recovery for more resilie…
elek Mar 15, 2019
328bfcc
HDDS-1263. SCM CLI does not list container with id 1.
vivekratnavel Mar 15, 2019
6fd95b8
HDDS-1296. Fix checkstyle issue from Nightly run. Contributed by Xiao…
xiaoyuyao Mar 18, 2019
7cd82b4
Revert "HDDS-1284. Adjust default values of pipline recovery for more…
arp7 Mar 18, 2019
d2e115d
HDDS-1119. DN get OM certificate from SCM CA for block token validat…
ajayydv Mar 19, 2019
a55a295
HDDS-1215. Change hadoop-runner and apache/hadoop base image to use J…
xiaoyuyao Mar 19, 2019
3edb697
HDDS-1307. Test ScmChillMode testChillModeOperations failed. (#622)
bharatviswa504 Mar 19, 2019
598c16c
HDDS-1072. Implement RetryProxy and FailoverProxy for OM client.
hanishakoneru Mar 5, 2019
b2c63c3
HDDS-1246. Add ozone delegation token utility subcmd for Ozone CLI. C…
xiaoyuyao Mar 15, 2019
6c009a3
HDDS-1321. TestOzoneManagerHttpServer depends on hard-coded port numb…
arp7 Mar 21, 2019
8f422d7
HDDS-1320. Update ozone to latest ratis snapshot build (0.4.0-1fc5ace…
arp7 Mar 21, 2019
6d73e7b
HDDS-1323. Ignore unit test TestFailureHandlingByClient. Contributed …
arp7 Mar 22, 2019
9b989c4
HDDS-1291. Set OmKeyArgs#refreshPipeline flag properly to avoid readi…
xiaoyuyao Mar 22, 2019
6b3e0b3
HDDS-1302. Fix SCM CLI does not list container with id 1.
vivekratnavel Mar 22, 2019
8d1c218
HDDS-1299. Support TokenIssuer interface for running jobs with OzoneF…
xiaoyuyao Mar 23, 2019
eed623a
HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsExceptio…
bshashikant Mar 25, 2019
f6acbc9
HDDS-1310. In datanode once a container becomes unhealthy, datanode r…
snemuri Mar 26, 2019
447d534
HDDS-939. Add S3 access check to Ozone manager. Contributed by Ajay K…
ajayydv Mar 26, 2019
299177e
HDDS-139. Output of createVolume can be improved. Contributed by Shweta.
Mar 27, 2019
5f8ded5
HDDS-1346. Remove hard-coded version ozone-0.5.0 from ReadMe of ozone…
xiaoyuyao Mar 28, 2019
f8f85fc
HDDS-1318. Fix MalformedTracerStateStringException on DN logs. Contri…
xiaoyuyao Mar 28, 2019
c53408c
HDDS-1293. ExcludeList#getProtoBuf throws ArrayIndexOutOfBoundsExcept…
bshashikant Mar 28, 2019
f2dee89
HDDS-1309 . change logging from warn to debug in XceiverClient. Contr…
nilotpalnandi Mar 28, 2019
4345e6e
[HDDS-1351] NoClassDefFoundError when running ozone genconf (). Contr…
adoroszlai Mar 29, 2019
f0640e2
HDDS-1357. ozone s3 shell command has confusing subcommands (#663)
elek Mar 30, 2019
bb20c80
HDDS-1312. Add more unit tests to verify BlockOutputStream functional…
bshashikant Apr 1, 2019
0b84c2d
HDDS-1067 . freon run on client gets hung when two of the datanodes a…
bshashikant Apr 1, 2019
fb7844d
HDDS-1255. Refactor ozone acceptance test to allow run in secure mode
Apr 1, 2019
0781a71
HDDS-1337. Handle GroupMismatchException in OzoneClient. Contributed …
linyiqun Apr 2, 2019
7030e6e
HDDS-1355. Only FQDN is accepted for OM rpc address in secure environ…
Apr 2, 2019
52da317
HDDS-1329. Update documentation for Ozone-0.4.0 release. Contributed …
Apr 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public boolean isHealthy() {
}

/**
* Sets if the node is healhty or not considering disks' health also.
* Sets if the node is healthy or not considering disks' health also.
*
* @param isHealthy
* if or not node is healthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -136,7 +138,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken));
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
if (secConfig.isGrpcTlsEnabled()) {
File trustCertCollectionFile = secConfig.getTrustStoreFile();
File privateKeyFile = secConfig.getClientPrivateKeyFile();
Expand Down Expand Up @@ -204,7 +207,7 @@ public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
try {
XceiverClientReply reply;
reply = sendCommandWithRetry(request, null);
reply = sendCommandWithTraceIDAndRetry(request, null);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
} catch (ExecutionException | InterruptedException e) {
Expand All @@ -217,7 +220,21 @@ public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
throws IOException {
Preconditions.checkState(HddsUtils.isReadOnly(request));
return sendCommandWithRetry(request, excludeDns);
return sendCommandWithTraceIDAndRetry(request, excludeDns);
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
return sendCommandWithRetry(finalPayload, excludeDns);
}
}

private XceiverClientReply sendCommandWithRetry(
Expand Down Expand Up @@ -253,7 +270,7 @@ private XceiverClientReply sendCommandWithRetry(
break;
}
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Failed to execute command " + request + " on datanode " + dn
LOG.debug("Failed to execute command " + request + " on datanode " + dn
.getUuidString(), e);
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.metrics2.MetricsSystem;
Expand All @@ -37,7 +38,9 @@ public class XceiverClientMetrics {
.getSimpleName();

private @Metric MutableCounterLong pendingOps;
private @Metric MutableCounterLong totalOps;
private MutableCounterLong[] pendingOpsArray;
private MutableCounterLong[] opsArray;
private MutableRate[] containerOpsLatency;
private MetricsRegistry registry;

Expand All @@ -46,12 +49,17 @@ public XceiverClientMetrics() {
this.registry = new MetricsRegistry(SOURCE_NAME);

this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
this.opsArray = new MutableCounterLong[numEnumEntries];
this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter(
"numPending" + ContainerProtos.Type.forNumber(i + 1),
"number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
opsArray[i] = registry
.newCounter("opCount" + ContainerProtos.Type.forNumber(i + 1),
"number of" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);

containerOpsLatency[i] = registry.newRate(
ContainerProtos.Type.forNumber(i + 1) + "Latency",
Expand All @@ -68,6 +76,8 @@ public static XceiverClientMetrics create() {

public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
pendingOps.incr();
totalOps.incr();
opsArray[type.ordinal()].incr();
pendingOpsArray[type.ordinal()].incr();
}

Expand All @@ -85,6 +95,16 @@ public long getContainerOpsMetrics(ContainerProtos.Type type) {
return pendingOpsArray[type.ordinal()].value();
}

@VisibleForTesting
public long getTotalOpCount() {
return totalOps.value();
}

@VisibleForTesting
public long getContainerOpCountMetrics(ContainerProtos.Type type) {
return opsArray[type.ordinal()].value();
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftRetryFailureException;
Expand All @@ -47,6 +49,7 @@
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,14 +77,16 @@ public static XceiverClientRatis newXceiverClientRatis(
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final TimeDuration clientRequestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
retryPolicy, tlsConfig);
retryPolicy, tlsConfig, clientRequestTimeout);
}

private final Pipeline pipeline;
Expand All @@ -90,19 +95,22 @@ public static XceiverClientRatis newXceiverClientRatis(
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
private final TimeDuration clientRequestTimeout;

// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;

// create a separate RaftClient for watchForCommit API
private RaftClient watchClient;

private XceiverClientMetrics metrics;

/**
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks, RetryPolicy retryPolicy,
GrpcTlsConfig tlsConfig) {
GrpcTlsConfig tlsConfig, TimeDuration timeout) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
Expand All @@ -111,6 +119,8 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
commitInfoMap = new ConcurrentHashMap<>();
watchClient = null;
this.tlsConfig = tlsConfig;
this.clientRequestTimeout = timeout;
metrics = XceiverClientManager.getXceiverClientMetrics();
}

private void updateCommitInfosMap(
Expand Down Expand Up @@ -160,7 +170,7 @@ public void connect() throws Exception {
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig))) {
maxOutstandingRequests, tlsConfig, clientRequestTimeout))) {
throw new IllegalStateException("Client is already connected.");
}
}
Expand Down Expand Up @@ -194,6 +204,12 @@ private RaftClient getClient() {
return Objects.requireNonNull(client.get(), "client is null");
}


@VisibleForTesting
public ConcurrentHashMap<UUID, Long> getCommitInfoMap() {
return commitInfoMap;
}

private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
try (Scope scope = GlobalTracer.get()
Expand Down Expand Up @@ -243,7 +259,7 @@ public XceiverClientReply watchForCommit(long index, long timeout)
if (watchClient == null) {
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}
CompletableFuture<RaftClientReply> replyFuture = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
Expand All @@ -260,9 +276,9 @@ public XceiverClientReply watchForCommit(long index, long timeout)
// TODO : need to remove the code to create the new RaftClient instance
// here once the watch request bypassing sliding window in Raft Client
// gets fixed.
watchClient = RatisHelper
.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig);
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
reply = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -296,47 +312,52 @@ public XceiverClientReply watchForCommit(long index, long timeout)
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) {
XceiverClientReply asyncReply = new XceiverClientReply(null);
long requestTime = Time.monotonicNowNanos();
CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request);
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
raftClientReply.whenComplete((reply, e) -> LOG.debug(
"received reply {} for request: cmdType={} containerID={}"
+ " pipelineID={} traceID={} exception: {}", reply,
request.getCmdType(), request.getContainerID(),
request.getPipelineID(), request.getTraceID(), e))
.thenApply(reply -> {
try {
// we need to handle RaftRetryFailure Exception
RaftRetryFailureException raftRetryFailureException =
reply.getRetryFailureException();
if (raftRetryFailureException != null) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
// and refreshed again. In case the client cannot connect to
// leader, getClient call will fail.
raftClientReply.whenComplete((reply, e) -> {
LOG.debug("received reply {} for request: cmdType={} containerID={}"
+ " pipelineID={} traceID={} exception: {}", reply,
request.getCmdType(), request.getContainerID(),
request.getPipelineID(), request.getTraceID(), e);
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
metrics.addContainerOpsLatency(request.getCmdType(),
Time.monotonicNowNanos() - requestTime);
}).thenApply(reply -> {
try {
// we need to handle RaftRetryFailure Exception
RaftRetryFailureException raftRetryFailureException =
reply.getRetryFailureException();
if (raftRetryFailureException != null) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
// and refreshed again. In case the client cannot connect to
// leader, getClient call will fail.

// No need to set the failed Server ID here. Ozone client
// will directly exclude this pipeline in next allocate block
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
throw new CompletionException(raftRetryFailureException);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
.parseFrom(reply.getMessage().getContent());
UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
updateCommitInfosMap(reply.getCommitInfos());
}
asyncReply.setLogIndex(reply.getLogIndex());
addDatanodetoReply(serverId, asyncReply);
return response;
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
}
});
// No need to set the failed Server ID here. Ozone client
// will directly exclude this pipeline in next allocate block
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
throw new CompletionException(raftRetryFailureException);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
.parseFrom(reply.getMessage().getContent());
UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
updateCommitInfosMap(reply.getCommitInfos());
}
asyncReply.setLogIndex(reply.getLogIndex());
addDatanodetoReply(serverId, asyncReply);
return response;
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
}
});
asyncReply.setResponse(containerCommandResponse);
return asyncReply;
}
Expand Down
Loading