diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 3a48ecaee5e4..1048ec5092c7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -183,7 +183,6 @@ long getStartTime() { private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); - private final RaftGroupId gid; private final ContainerDispatcher dispatcher; private final ContainerController containerController; private final XceiverServerRatis ratisServer; @@ -218,7 +217,6 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI ConfigurationSource conf, String threadNamePrefix) { this.datanodeService = hddsDatanodeService; - this.gid = gid; this.dispatcher = dispatcher; this.containerController = containerController; this.ratisServer = ratisServer; @@ -282,7 +280,7 @@ public void initialize( throws IOException { super.initialize(server, id, raftStorage); storage.init(raftStorage); - ratisServer.notifyGroupAdd(gid); + ratisServer.notifyGroupAdd(id); LOG.info("{}: initialize {}", server.getId(), id); loadSnapshot(storage.getLatestSnapshot()); @@ -293,7 +291,7 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot) if (snapshot == null) { TermIndex empty = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX); LOG.info("{}: The snapshot info is null. Setting the last applied index " + - "to:{}", gid, empty); + "to:{}", getGroupId(), empty); setLastAppliedTermIndex(empty); return empty.getIndex(); } @@ -301,7 +299,7 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot) final File snapshotFile = snapshot.getFile().getPath().toFile(); final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); - LOG.info("{}: Setting the last applied index to {}", gid, last); + LOG.info("{}: Setting the last applied index to {}", getGroupId(), last); setLastAppliedTermIndex(last); // initialize the dispatcher with snapshot so that it build the missing @@ -351,7 +349,7 @@ public long takeSnapshot() throws IOException { long startTime = Time.monotonicNow(); if (!isStateMachineHealthy()) { String msg = - "Failed to take snapshot " + " for " + gid + " as the stateMachine" + "Failed to take snapshot " + " for " + getGroupId() + " as the stateMachine" + " is unhealthy. The last applied index is at " + ti; StateMachineException sme = new StateMachineException(msg); LOG.error(msg); @@ -360,19 +358,19 @@ public long takeSnapshot() throws IOException { if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) { final File snapshotFile = storage.getSnapshotFile(ti.getTerm(), ti.getIndex()); - LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile); + LOG.info("{}: Taking a snapshot at:{} file {}", getGroupId(), ti, snapshotFile); try (FileOutputStream fos = new FileOutputStream(snapshotFile)) { persistContainerSet(fos); fos.flush(); // make sure the snapshot file is synced fos.getFD().sync(); } catch (IOException ioe) { - LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti, + LOG.error("{}: Failed to write snapshot at:{} file {}", getGroupId(), ti, snapshotFile); throw ioe; } LOG.info("{}: Finished taking a snapshot at:{} file:{} took: {} ms", - gid, ti, snapshotFile, (Time.monotonicNow() - startTime)); + getGroupId(), ti, snapshotFile, (Time.monotonicNow() - startTime)); return ti.getIndex(); } return -1; @@ -386,7 +384,7 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol final StateMachineLogEntryProto stateMachineLogEntry = entry.getStateMachineLogEntry(); final ContainerCommandRequestProto logProto; try { - logProto = getContainerCommandRequestProto(gid, stateMachineLogEntry.getLogData()); + logProto = getContainerCommandRequestProto(getGroupId(), stateMachineLogEntry.getLogData()); } catch (InvalidProtocolBufferException e) { trx.setException(e); return trx; @@ -413,7 +411,7 @@ public TransactionContext startTransaction(RaftClientRequest request) long startTime = Time.monotonicNowNanos(); final ContainerCommandRequestProto proto = message2ContainerCommandRequestProto(request.getMessage()); - Preconditions.checkArgument(request.getRaftGroupId().equals(gid)); + Preconditions.checkArgument(request.getRaftGroupId().equals(getGroupId())); final TransactionContext.Builder builder = TransactionContext.newBuilder() .setClientRequest(request) @@ -449,7 +447,7 @@ public TransactionContext startTransaction(RaftClientRequest request) final WriteChunkRequestProto.Builder commitWriteChunkProto = WriteChunkRequestProto.newBuilder(write) .clearData(); protoBuilder.setWriteChunk(commitWriteChunkProto) - .setPipelineID(gid.getUuid().toString()) + .setPipelineID(getGroupId().getUuid().toString()) .setTraceID(proto.getTraceID()); builder.setStateMachineData(write.getData()); @@ -491,20 +489,20 @@ private static ContainerCommandRequestProto getContainerCommandRequestProto( private ContainerCommandRequestProto message2ContainerCommandRequestProto( Message message) throws InvalidProtocolBufferException { - return ContainerCommandRequestMessage.toProto(message.getContent(), gid); + return ContainerCommandRequestMessage.toProto(message.getContent(), getGroupId()); } private ContainerCommandResponseProto dispatchCommand( ContainerCommandRequestProto requestProto, DispatcherContext context) { if (LOG.isTraceEnabled()) { - LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid, + LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", getGroupId(), requestProto.getCmdType(), requestProto.getContainerID(), requestProto.getPipelineID(), requestProto.getTraceID()); } ContainerCommandResponseProto response = dispatcher.dispatch(requestProto, context); if (LOG.isTraceEnabled()) { - LOG.trace("{}: response {}", gid, response); + LOG.trace("{}: response {}", getGroupId(), response); } return response; } @@ -531,7 +529,7 @@ private CompletableFuture writeStateMachineData( RaftServer server = ratisServer.getServer(); Preconditions.checkArgument(!write.getData().isEmpty()); try { - if (server.getDivision(gid).getInfo().isLeader()) { + if (server.getDivision(getGroupId()).getInfo().isLeader()) { stateMachineDataCache.put(entryIndex, write.getData()); } } catch (InterruptedException ioe) { @@ -559,7 +557,7 @@ private CompletableFuture writeStateMachineData( return dispatchCommand(requestProto, context); } catch (Exception e) { LOG.error("{}: writeChunk writeStateMachineData failed: blockId" + - "{} logIndex {} chunkName {}", gid, write.getBlockID(), + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), entryIndex, write.getChunkData().getChunkName(), e); metrics.incNumWriteDataFails(); // write chunks go in parallel. It's possible that one write chunk @@ -573,7 +571,7 @@ private CompletableFuture writeStateMachineData( writeChunkFutureMap.put(entryIndex, writeChunkFuture); if (LOG.isDebugEnabled()) { LOG.debug("{}: writeChunk writeStateMachineData : blockId" + - "{} logIndex {} chunkName {}", gid, write.getBlockID(), + "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(), entryIndex, write.getChunkData().getChunkName()); } // Remove the future once it finishes execution from the @@ -587,7 +585,7 @@ private CompletableFuture writeStateMachineData( && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); - LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + + LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + write.getChunkData().getChunkName() + " Error message: " + r.getMessage() + " Container Result: " + r.getResult()); @@ -601,7 +599,7 @@ private CompletableFuture writeStateMachineData( metrics.incNumBytesWrittenCount( requestProto.getWriteChunk().getChunkData().getLen()); if (LOG.isDebugEnabled()) { - LOG.debug(gid + + LOG.debug(getGroupId() + ": writeChunk writeStateMachineData completed: blockId" + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + write.getChunkData().getChunkName()); @@ -622,7 +620,7 @@ private StateMachine.DataChannel getStreamDataChannel( DispatcherContext context) throws StorageContainerException { if (LOG.isDebugEnabled()) { LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " + - "traceID={}", gid, requestProto.getCmdType(), + "traceID={}", getGroupId(), requestProto.getCmdType(), requestProto.getContainerID(), requestProto.getPipelineID(), requestProto.getTraceID()); } @@ -781,7 +779,7 @@ private ByteString readStateMachineData( new StorageContainerException(response.getMessage(), response.getResult()); LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : " - + "{} Container Result: {}", gid, response.getCmdType(), index, + + "{} Container Result: {}", getGroupId(), response.getCmdType(), index, response.getMessage(), response.getResult()); stateMachineHealthy.set(false); throw sce; @@ -856,7 +854,7 @@ public CompletableFuture read(LogEntryProto entry, TransactionContex .map(TransactionContext::getStateMachineContext) .orElse(null); final ContainerCommandRequestProto requestProto = context != null ? context.getLogProto() - : getContainerCommandRequestProto(gid, entry.getStateMachineLogEntry().getLogData()); + : getContainerCommandRequestProto(getGroupId(), entry.getStateMachineLogEntry().getLogData()); if (requestProto.getCmdType() != Type.WriteChunk) { throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() @@ -874,7 +872,7 @@ public CompletableFuture read(LogEntryProto entry, TransactionContex return future; } catch (Exception e) { metrics.incNumReadStateMachineFails(); - LOG.error("{} unable to read stateMachineData:", gid, e); + LOG.error("{} unable to read stateMachineData:", getGroupId(), e); return completeExceptionally(e); } } @@ -920,7 +918,7 @@ public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allS // from `HddsDatanodeService.stop()`, otherwise, it indicates this `close` originates from ratis. if (allServer) { if (datanodeService != null && !datanodeService.isStopped()) { - LOG.info("{} is closed by ratis", gid); + LOG.info("{} is closed by ratis", getGroupId()); if (semaphore.tryAcquire()) { // run with a different thread, so this raft group can be closed Runnable runnable = () -> { @@ -952,7 +950,7 @@ public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allS CompletableFuture.runAsync(runnable); } } else { - LOG.info("{} is closed by HddsDatanodeService", gid); + LOG.info("{} is closed by HddsDatanodeService", getGroupId()); } } } @@ -983,7 +981,7 @@ private CompletableFuture applyTransaction( private void removeStateMachineDataIfNeeded(long index) { if (waitOnBothFollowers) { try { - RaftServer.Division division = ratisServer.getServer().getDivision(gid); + RaftServer.Division division = ratisServer.getServer().getDivision(getGroupId()); if (division.getInfo().isLeader()) { long minIndex = Arrays.stream(division.getInfo() .getFollowerNextIndices()).min().getAsLong(); @@ -1041,7 +1039,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { CompletableFuture applyTransactionFuture = new CompletableFuture<>(); final Consumer exceptionHandler = e -> { - LOG.error(gid + ": failed to applyTransaction at logIndex " + index + LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index + " for " + requestProto.getCmdType(), e); stateMachineHealthy.compareAndSet(true, false); metrics.incNumApplyTransactionsFails(); @@ -1069,7 +1067,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { new StorageContainerException(r.getMessage(), r.getResult()); LOG.error( "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : " - + "{} Container Result: {}", gid, r.getCmdType(), index, + + "{} Container Result: {}", getGroupId(), r.getCmdType(), index, r.getMessage(), r.getResult()); metrics.incNumApplyTransactionsFails(); // Since the applyTransaction now is completed exceptionally, @@ -1078,12 +1076,12 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // shutdown. applyTransactionFuture.completeExceptionally(sce); stateMachineHealthy.compareAndSet(true, false); - ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole()); + ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole()); } else { if (LOG.isDebugEnabled()) { LOG.debug( "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : " - + "{} Container Result: {}", gid, r.getCmdType(), index, + + "{} Container Result: {}", getGroupId(), r.getCmdType(), index, r.getMessage(), r.getResult()); } if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) { @@ -1161,25 +1159,25 @@ public void evictStateMachineCache() { @Override public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer follower) { - ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower); + ratisServer.handleFollowerSlowness(getGroupId(), roleInfoProto, follower); } @Override public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) { - ratisServer.handleNoLeader(gid, roleInfoProto); + ratisServer.handleNoLeader(getGroupId(), roleInfoProto); } @Override public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) { - LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry), + LOG.error("{}: {} {}", getGroupId(), TermIndex.valueOf(failedEntry), toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()), t); - ratisServer.handleNodeLogFailure(gid, t); + ratisServer.handleNodeLogFailure(getGroupId(), t); } @Override public CompletableFuture notifyInstallSnapshotFromLeader( RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { - ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto, + ratisServer.handleInstallSnapshotFromLeader(getGroupId(), roleInfoProto, firstTermIndexInLog); final CompletableFuture future = new CompletableFuture<>(); future.complete(firstTermIndexInLog); @@ -1188,7 +1186,7 @@ public CompletableFuture notifyInstallSnapshotFromLeader( @Override public void notifyGroupRemove() { - ratisServer.notifyGroupRemove(gid); + ratisServer.notifyGroupRemove(getGroupId()); // Make best effort to quasi-close all the containers on group removal. // Containers already in terminal state like CLOSED or UNHEALTHY will not // be affected. @@ -1196,7 +1194,7 @@ public void notifyGroupRemove() { try { containerController.markContainerForClose(cid); containerController.quasiCloseContainer(cid, - "Ratis group removed. Group id: " + gid); + "Ratis group removed. Group id: " + getGroupId()); } catch (IOException e) { LOG.debug("Failed to quasi-close container {}", cid); } @@ -1218,7 +1216,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, @Override public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) { - return smProtoToString(gid, containerController, proto); + return smProtoToString(getGroupId(), containerController, proto); } public static String smProtoToString(RaftGroupId gid,