Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
Expand Down Expand Up @@ -69,7 +70,8 @@
* The underlying RPC mechanism can be chosen via the constructor.
*/
public final class XceiverClientRatis extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
public static final Logger LOG =
LoggerFactory.getLogger(XceiverClientRatis.class);

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
Expand Down Expand Up @@ -248,13 +250,17 @@ public XceiverClientReply watchForCommit(long index, long timeout)
return clientReply;
}
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
RaftClientReply reply;
try {
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
replyFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException toe) {
LOG.warn("3 way commit failed ", toe);
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed ", e);
if (t instanceof GroupMismatchException) {
throw e;
}
reply = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
Expand All @@ -40,6 +43,10 @@
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,8 +57,12 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;

/**
* Utility methods for Ozone and Container Clients.
Expand All @@ -72,6 +83,18 @@ public final class HddsClientUtils {
private HddsClientUtils() {
}

private static final List<Class<? extends Exception>> EXCEPTION_LIST =
new ArrayList<Class<? extends Exception>>() {{
add(TimeoutException.class);
add(ContainerNotOpenException.class);
add(RaftRetryFailureException.class);
add(AlreadyClosedException.class);
add(GroupMismatchException.class);
// Not Replicated Exception will be thrown if watch For commit
// does not succeed
add(NotReplicatedException.class);
}};

/**
* Date format that used in ozone. Here the format is thread safe to use.
*/
Expand Down Expand Up @@ -290,4 +313,49 @@ public static SCMSecurityProtocol getScmSecurityClient(
Client.getRpcTimeout(conf)));
return scmSecurityClient;
}

public static Throwable checkForException(Exception e) throws IOException {
Throwable t = e;
while (t != null) {
for (Class<? extends Exception> cls : getExceptionList()) {
if (cls.isInstance(t)) {
return t;
}
}
t = t.getCause();
}

throw e instanceof IOException ? (IOException)e : new IOException(e);
}

public static RetryPolicy createRetryPolicy(int maxRetryCount,
long retryInterval) {
// retry with fixed sleep between retries
return RetryPolicies.retryUpToMaximumCountWithFixedSleep(
maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
}

public static Map<Class<? extends Throwable>,
RetryPolicy> getRetryPolicyByException(int maxRetryCount,
long retryInterval) {
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
for (Class<? extends Exception> ex : EXCEPTION_LIST) {
if (ex == TimeoutException.class
|| ex == RaftRetryFailureException.class) {
// retry without sleep
policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
} else {
// retry with fixed sleep between retries
policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
}
}
// Default retry policy
policyMap
.put(Exception.class, createRetryPolicy(maxRetryCount, retryInterval));
return policyMap;
}

public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream {
public static final Logger LOG =
LoggerFactory.getLogger(BlockOutputStream.class);

private BlockID blockID;
private volatile BlockID blockID;
private final String key;
private final String traceID;
private final BlockData.Builder containerBlockData;
Expand Down Expand Up @@ -574,14 +574,18 @@ public void cleanup(boolean invalidateClient) {
* @throws IOException if stream is closed
*/
private void checkOpen() throws IOException {
if (xceiverClient == null) {
if (isClosed()) {
throw new IOException("BlockOutputStream has been closed.");
} else if (getIoException() != null) {
adjustBuffersOnException();
throw getIoException();
}
}

public boolean isClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need not be public. If you need it for testing, please annotate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be pulic because this function gets invoked from BlockoutputStreamEntryPool#isClosed() and they are in different packages.

return xceiverClient == null;
}

/**
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
Expand Down Expand Up @@ -635,4 +639,9 @@ private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
+ " length " + effectiveChunkSize);
containerBlockData.addChunks(chunkInfo);
}

@VisibleForTesting
public void setXceiverClient(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ void releaseBuffersOnException() {
*/
public XceiverClientReply watchForCommit(long commitIndex)
throws IOException {
Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
long index;
try {
XceiverClientReply reply =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ public final class ScmConfigKeys {
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
"dfs.ratis.client.request.max.retries";
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
"dfs.ratis.client.request.retry.interval";
public static final TimeDuration
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
"dfs.ratis.server.retry-cache.timeout.duration";
public static final TimeDuration
Expand Down
4 changes: 2 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@
</property>
<property>
<name>dfs.ratis.client.request.max.retries</name>
<value>20</value>
<value>180</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>Number of retries for ratis client request.</description>
</property>
<property>
<name>dfs.ratis.client.request.retry.interval</name>
<value>500ms</value>
<value>1000ms</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>Interval between successive retries for a ratis client request.
</description>
Expand Down
2 changes: 1 addition & 1 deletion hadoop-hdds/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.5.0-SNAPSHOT</hdds.version>

<!-- Apache Ratis version -->
<ratis.version>0.3.0</ratis.version>
<ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>

<bouncycastle.version>1.60</bouncycastle.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
package org.apache.hadoop.ozone.client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -36,23 +32,11 @@
import org.apache.hadoop.ozone.client.rest.response.KeyLocation;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;

/** A utility class for OzoneClient. */
public final class OzoneClientUtils {

private OzoneClientUtils() {}

private static final List<Class<? extends Exception>> EXCEPTION_LIST =
new ArrayList<Class<? extends Exception>>() {{
add(TimeoutException.class);
add(ContainerNotOpenException.class);
add(RaftRetryFailureException.class);
add(AlreadyClosedException.class);
add(GroupMismatchException.class);
}};
/**
* Returns a BucketInfo object constructed using fields of the input
* OzoneBucket object.
Expand Down Expand Up @@ -141,26 +125,4 @@ public static RetryPolicy createRetryPolicy(int maxRetryCount,
maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
}

public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}

public static Map<Class<? extends Throwable>, RetryPolicy>
getRetryPolicyByException(int maxRetryCount, long retryInterval) {
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
for (Class<? extends Exception> ex : EXCEPTION_LIST) {
if (ex == TimeoutException.class ||
ex == RaftRetryFailureException.class) {
// retry without sleep
policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
} else {
// retry with fixed sleep between retries
policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
}
}
// Default retry policy
policyMap.put(Exception.class, createRetryPolicy(
maxRetryCount, retryInterval));
return policyMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ public void close() throws IOException {
}
}

boolean isClosed() {
if (outputStream != null) {
return ((BlockOutputStream) outputStream).isClosed();
}
return false;
}

long getTotalAckDataLength() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
Expand Down
Loading