diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index efd82bce7bbd5..4a90e489c9bc1 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -18,52 +18,54 @@
package org.apache.hadoop.hdds.scm;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-
-import io.opentracing.Scope;
-import io.opentracing.util.GlobalTracer;
-import org.apache.hadoop.util.Time;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.thirdparty.com.google.protobuf
- .InvalidProtocolBufferException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-
+import org.apache.hadoop.util.Time;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import io.opentracing.Scope;
+import io.opentracing.util.GlobalTracer;
/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -309,10 +311,7 @@ public XceiverClientReply sendCommandAsync(
Time.monotonicNowNanos() - requestTime);
}).thenApply(reply -> {
try {
- // we need to handle RaftRetryFailure Exception
- RaftRetryFailureException raftRetryFailureException =
- reply.getRetryFailureException();
- if (raftRetryFailureException != null) {
+ if (!reply.isSuccess()) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
@@ -324,7 +323,10 @@ public XceiverClientReply sendCommandAsync(
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
- throw new CompletionException(raftRetryFailureException);
+ RaftException exception = reply.getException();
+ Preconditions.checkNotNull(exception, "Raft reply failure but " +
+ "no exception propagated.");
+ throw new CompletionException(exception);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ae09c9d6d9d9a..a98739900c96f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -107,6 +107,11 @@ public final class ScmConfigKeys {
"dfs.container.ratis.log.appender.queue.byte-limit";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+ public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
+ "dfs.container.ratis.log.purge.gap";
+ // TODO: Set to 1024 once RATIS issue around purge is fixed.
+ public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
+ 1000000000;
// expiry interval stateMachineData cache entry inside containerStateMachine
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
@@ -146,7 +151,7 @@ public final class ScmConfigKeys {
public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"dfs.ratis.snapshot.threshold";
- public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000;
+ public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;
public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1463c43e830f3..b77cca35a870f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -322,6 +322,10 @@ public final class OzoneConfigKeys {
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT;
+ public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
+ ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP;
+ public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
+ ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT;
public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY;
public static final TimeDuration
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 427def917e1ad..c10aa3353a0b2 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -104,6 +104,14 @@
Byte limit for ratis leader's log appender queue.
+
+ dfs.container.ratis.log.purge.gap
+ 1000000000
+ OZONE, DEBUG, CONTAINER, RATIS
+ Purge gap between the last purged commit index
+ and the current index, when the leader decides to purge its log.
+
+
dfs.container.ratis.datanode.storage.dir
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 7a7baec3001b2..44074e70d54db 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -28,12 +28,12 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
@@ -195,12 +195,12 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
throws IOException {
if (snapshot == null) {
TermIndex empty =
- TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
+ TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
LOG.info(
"The snapshot info is null." + "Setting the last applied index to:"
+ empty);
setLastAppliedTermIndex(empty);
- return RaftServerConstants.INVALID_LOG_INDEX;
+ return RaftLog.INVALID_LOG_INDEX;
}
final File snapshotFile = snapshot.getFile().getPath().toFile();
@@ -243,7 +243,7 @@ public void persistContainerSet(OutputStream out) throws IOException {
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
LOG.info("Taking snapshot at termIndex:" + ti);
- if (ti != null && ti.getIndex() != RaftServerConstants.INVALID_LOG_INDEX) {
+ if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("Taking a snapshot to file {}", snapshotFile);
@@ -651,14 +651,13 @@ private void evictStateMachineCache() {
}
@Override
- public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
- ratisServer.handleNodeSlowness(group, roleInfoProto);
+ public void notifySlowness(RoleInfoProto roleInfoProto) {
+ ratisServer.handleNodeSlowness(gid, roleInfoProto);
}
@Override
- public void notifyExtendedNoLeader(RaftGroup group,
- RoleInfoProto roleInfoProto) {
- ratisServer.handleNoLeader(group, roleInfoProto);
+ public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
+ ratisServer.handleNoLeader(gid, roleInfoProto);
}
@Override
@@ -667,6 +666,16 @@ public void notifyNotLeader(Collection pendingEntries)
evictStateMachineCache();
}
+ @Override
+ public CompletableFuture notifyInstallSnapshotFromLeader(
+ RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+ ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
+ firstTermIndexInLog);
+ final CompletableFuture future = new CompletableFuture<>();
+ future.complete(firstTermIndexInLog);
+ return future;
+ }
+
@Override
public void close() throws IOException {
evictStateMachineCache();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 424281891b680..246d58af2010f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -57,7 +57,6 @@
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -66,6 +65,7 @@
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -240,8 +240,9 @@ private RaftProperties newRaftProperties(Configuration conf) {
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
- RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements);
- RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit);
+ RaftServerConfigKeys.Log.setQueueElementLimit(
+ properties, logQueueNumElements);
+ RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit);
int numSyncRetries = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
@@ -251,8 +252,17 @@ private RaftProperties newRaftProperties(Configuration conf) {
numSyncRetries);
// Enable the StateMachineCaching
- RaftServerConfigKeys.Log.StateMachineData
- .setCachingEnabled(properties, true);
+ RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled(
+ properties, true);
+
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+ false);
+
+ int purgeGap = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap);
+
return properties;
}
@@ -590,11 +600,32 @@ public List getPipelineIds() {
return pipelineIDs;
}
- void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
- handlePipelineFailure(group.getGroupId(), roleInfoProto);
+ void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
+ handlePipelineFailure(groupId, roleInfoProto);
}
- void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
- handlePipelineFailure(group.getGroupId(), roleInfoProto);
+ void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
+ handlePipelineFailure(groupId, roleInfoProto);
+ }
+
+ /**
+ * The fact that the snapshot contents cannot be used to actually catch up
+ * the follower, it is the reason to initiate close pipeline and
+ * not install the snapshot. The follower will basically never be able to
+ * catch up.
+ *
+ * @param groupId raft group information
+ * @param roleInfoProto information about the current node role and
+ * rpc delay information.
+ * @param firstTermIndexInLog After the snapshot installation is complete,
+ * return the last included term index in the snapshot.
+ */
+ void handleInstallSnapshotFromLeader(RaftGroupId groupId,
+ RoleInfoProto roleInfoProto,
+ TermIndex firstTermIndexInLog) {
+ LOG.warn("Install snapshot notification received from Leader with " +
+ "termIndex: {}, terminating pipeline: {}",
+ firstTermIndexInLog, groupId);
+ handlePipelineFailure(groupId, roleInfoProto);
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index 0e87c2cc8db9f..12ed0a3f78650 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
0.5.0-SNAPSHOT
- 0.4.0-fe2b15d-SNAPSHOT
+ 0.4.0-2337318-SNAPSHOT
1.60
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index cd99cd1fab231..2cbef50cb0492 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.om.ratis;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -24,23 +26,18 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ServiceException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
@@ -51,7 +48,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ServiceException;
/**
* OM Ratis client to interact with OM Ratis server endpoint.
@@ -167,29 +166,25 @@ private CompletableFuture sendCommandAsync(OMRequest request) {
CompletableFuture raftClientReply =
sendRequestAsync(request);
- CompletableFuture omRatisResponse =
- raftClientReply.whenComplete((reply, e) -> LOG.debug(
- "received reply {} for request: cmdType={} traceID={} " +
- "exception: {}", reply, request.getCmdType(),
- request.getTraceID(), e))
- .thenApply(reply -> {
- try {
- // we need to handle RaftRetryFailure Exception
- RaftRetryFailureException raftRetryFailureException =
- reply.getRetryFailureException();
- if (raftRetryFailureException != null) {
- throw new CompletionException(raftRetryFailureException);
- }
-
- OMResponse response = OMRatisHelper
- .getOMResponseFromRaftClientReply(reply);
-
- return response;
- } catch (InvalidProtocolBufferException e) {
- throw new CompletionException(e);
- }
- });
- return omRatisResponse;
+ return raftClientReply.whenComplete((reply, e) -> LOG.debug(
+ "received reply {} for request: cmdType={} traceID={} " +
+ "exception: {}", reply, request.getCmdType(),
+ request.getTraceID(), e))
+ .thenApply(reply -> {
+ try {
+ Preconditions.checkNotNull(reply);
+ if (!reply.isSuccess()) {
+ RaftException exception = reply.getException();
+ Preconditions.checkNotNull(exception, "Raft reply failure " +
+ "but no exception propagated.");
+ throw new CompletionException(exception);
+ }
+ return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
+
+ } catch (InvalidProtocolBufferException e) {
+ throw new CompletionException(e);
+ }
+ });
}
/**
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index 9fa1c8b6d506f..235627608915c 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -29,7 +29,7 @@
3.2.0
0.5.0-SNAPSHOT
0.5.0-SNAPSHOT
- 0.4.0-fe2b15d-SNAPSHOT
+ 0.4.0-2337318-SNAPSHOT
1.60
Crater Lake
${ozone.version}