Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,54 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;

import org.apache.hadoop.util.Time;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;

/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
Expand Down Expand Up @@ -309,10 +311,7 @@ public XceiverClientReply sendCommandAsync(
Time.monotonicNowNanos() - requestTime);
}).thenApply(reply -> {
try {
// we need to handle RaftRetryFailure Exception
RaftRetryFailureException raftRetryFailureException =
reply.getRetryFailureException();
if (raftRetryFailureException != null) {
if (!reply.isSuccess()) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
Expand All @@ -324,7 +323,10 @@ public XceiverClientReply sendCommandAsync(
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
throw new CompletionException(raftRetryFailureException);
RaftException exception = reply.getException();
Preconditions.checkNotNull(exception, "Raft reply failure but " +
"no exception propagated.");
throw new CompletionException(exception);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public final class ScmConfigKeys {
"dfs.container.ratis.log.appender.queue.byte-limit";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
"dfs.container.ratis.log.purge.gap";
// TODO: Set to 1024 once RATIS issue around purge is fixed.
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
1000000000;
// expiry interval stateMachineData cache entry inside containerStateMachine
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
Expand Down Expand Up @@ -146,7 +151,7 @@ public final class ScmConfigKeys {

public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"dfs.ratis.snapshot.threshold";
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000;
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;

public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ public final class OzoneConfigKeys {
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP;
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT;
public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY;
public static final TimeDuration
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@
<description>Byte limit for ratis leader's log appender queue.
</description>
</property>
<property>
<name>dfs.container.ratis.log.purge.gap</name>
<value>1024</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Purge gap between the last purged commit index
and the current index, when the leader decides to purge its log.
</description>
</property>
<property>
<name>dfs.container.ratis.datanode.storage.dir</name>
<value/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,66 +46,66 @@ public class ConfigFileGenerator extends AbstractProcessor {
@Override
public boolean process(Set<? extends TypeElement> annotations,
RoundEnvironment roundEnv) {
if (roundEnv.processingOver()) {
return false;
}

Filer filer = processingEnv.getFiler();

try {

//load existing generated config (if exists)
ConfigFileAppender appender = new ConfigFileAppender();
try (InputStream input = filer
.getResource(StandardLocation.CLASS_OUTPUT, "",
OUTPUT_FILE_NAME).openInputStream()) {
appender.load(input);
} catch (FileNotFoundException ex) {
appender.init();
}

Set<? extends Element> annotatedElements =
roundEnv.getElementsAnnotatedWith(ConfigGroup.class);
for (Element annotatedElement : annotatedElements) {
TypeElement configGroup = (TypeElement) annotatedElement;

//check if any of the setters are annotated with @Config
for (Element element : configGroup.getEnclosedElements()) {
if (element.getKind() == ElementKind.METHOD) {
processingEnv.getMessager()
.printMessage(Kind.WARNING, element.getSimpleName().toString());
if (element.getSimpleName().toString().startsWith("set")
&& element.getAnnotation(Config.class) != null) {

//update the ozone-site-generated.xml
Config configAnnotation = element.getAnnotation(Config.class);
ConfigGroup configGroupAnnotation =
configGroup.getAnnotation(ConfigGroup.class);

String key = configGroupAnnotation.prefix() + "."
+ configAnnotation.key();

appender.addConfig(key,
configAnnotation.defaultValue(),
configAnnotation.description(),
configAnnotation.tags());
}
}

}
FileObject resource = filer
.createResource(StandardLocation.CLASS_OUTPUT, "",
OUTPUT_FILE_NAME);

try (Writer writer = new OutputStreamWriter(
resource.openOutputStream(), StandardCharsets.UTF_8)) {
appender.write(writer);
}
}
} catch (IOException e) {
processingEnv.getMessager().printMessage(Kind.ERROR,
"Can't generate the config file from annotation: " + e.getMessage());
}
// if (roundEnv.processingOver()) {
// return false;
// }
//
// Filer filer = processingEnv.getFiler();
//
// try {
//
// //load existing generated config (if exists)
// ConfigFileAppender appender = new ConfigFileAppender();
// try (InputStream input = filer
// .getResource(StandardLocation.CLASS_OUTPUT, "",
// OUTPUT_FILE_NAME).openInputStream()) {
// appender.load(input);
// } catch (FileNotFoundException ex) {
// appender.init();
// }
//
// Set<? extends Element> annotatedElements =
// roundEnv.getElementsAnnotatedWith(ConfigGroup.class);
// for (Element annotatedElement : annotatedElements) {
// TypeElement configGroup = (TypeElement) annotatedElement;
//
// //check if any of the setters are annotated with @Config
// for (Element element : configGroup.getEnclosedElements()) {
// if (element.getKind() == ElementKind.METHOD) {
// processingEnv.getMessager()
// .printMessage(Kind.WARNING, element.getSimpleName().toString());
// if (element.getSimpleName().toString().startsWith("set")
// && element.getAnnotation(Config.class) != null) {
//
// //update the ozone-site-generated.xml
// Config configAnnotation = element.getAnnotation(Config.class);
// ConfigGroup configGroupAnnotation =
// configGroup.getAnnotation(ConfigGroup.class);
//
// String key = configGroupAnnotation.prefix() + "."
// + configAnnotation.key();
//
// appender.addConfig(key,
// configAnnotation.defaultValue(),
// configAnnotation.description(),
// configAnnotation.tags());
// }
// }
//
// }
// FileObject resource = filer
// .createResource(StandardLocation.CLASS_OUTPUT, "",
// OUTPUT_FILE_NAME);
//
// try (Writer writer = new OutputStreamWriter(
// resource.openOutputStream(), StandardCharsets.UTF_8)) {
// appender.write(writer);
// }
// }
// } catch (IOException e) {
// processingEnv.getMessager().printMessage(Kind.ERROR,
// "Can't generate the config file from annotation: " + e.getMessage());
// }
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerConstants;
Expand Down Expand Up @@ -651,14 +650,13 @@ private void evictStateMachineCache() {
}

@Override
public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(group, roleInfoProto);
public void notifySlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
}

@Override
public void notifyExtendedNoLeader(RaftGroup group,
RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(group, roleInfoProto);
public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(gid, roleInfoProto);
}

@Override
Expand All @@ -667,6 +665,16 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
evictStateMachineCache();
}

@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
firstTermIndexInLog);
final CompletableFuture<TermIndex> future = new CompletableFuture<>();
future.complete(firstTermIndexInLog);
return future;
}

@Override
public void close() throws IOException {
evictStateMachineCache();
Expand Down
Loading