diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 3042b4d847a0..32ba6081e4fc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -144,6 +144,23 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private int retryInterval = 0; + @Config(key = "max.read.retries", + defaultValue = "3", + description = "Maximum number of retries by Ozone Client on " + + "encountering connectivity exception when reading a key.", + tags = ConfigTag.CLIENT) + private int maxReadRetryCount = 3; + + @Config(key = "read.retry.interval", + defaultValue = "1", + description = + "Indicates the time duration in seconds a client will wait " + + "before retrying a read key request on encountering " + + "a connectivity excepetion from Datanodes . " + + "By default the interval is 1millisecond", + tags = ConfigTag.CLIENT) + private int readRetryInterval = 1; + @Config(key = "checksum.type", defaultValue = "CRC32", description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] " @@ -326,6 +343,22 @@ public void setRetryInterval(int retryInterval) { this.retryInterval = retryInterval; } + public int getMaxReadRetryCount() { + return maxReadRetryCount; + } + + public void setMaxReadRetryCount(int maxReadRetryCount) { + this.maxReadRetryCount = maxReadRetryCount; + } + + public int getReadRetryInterval() { + return readRetryInterval; + } + + public void setReadRetryInterval(int readRetryInterval) { + this.readRetryInterval = readRetryInterval; + } + public ChecksumType getChecksumType() { return ChecksumType.valueOf(checksumType); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 1fb4bf954c74..73d9df913b9a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -39,7 +38,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; @@ -76,8 +74,8 @@ public class BlockInputStream extends BlockExtendedInputStream { private XceiverClientSpi xceiverClient; private boolean initialized = false; // TODO: do we need to change retrypolicy based on exception. - private final RetryPolicy retryPolicy = - HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1)); + private final RetryPolicy retryPolicy = getRetryPolicy(); + private int retries; // List of ChunkInputStreams, one for each chunk in the block diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java index 3e78abbf485a..2fc88ddc548d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java @@ -22,11 +22,15 @@ import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; /** * Abstact class which extends InputStream and some common interfaces used by @@ -36,6 +40,7 @@ public abstract class ExtendedInputStream extends InputStream implements Seekable, CanUnbuffer, ByteBufferReadable, StreamCapabilities { protected static final int EOF = -1; + private static RetryPolicy retryPolicy; @Override public synchronized int read() throws IOException { @@ -101,4 +106,14 @@ public boolean hasCapability(String capability) { return false; } } + + public static void setRetryPolicy(OzoneClientConfig config) { + retryPolicy = + HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), + TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); + } + + public static RetryPolicy getRetryPolicy() { + return retryPolicy; + } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 7755adc5f33e..b6d3a4cedfac 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -22,8 +22,10 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -96,6 +98,8 @@ public void setup() throws Exception { Pipeline pipeline = MockPipeline.createSingleNodePipeline(); blockStream = new DummyBlockInputStream(blockID, blockSize, pipeline, null, false, null, refreshFunction, chunks, chunkDataMap); + BlockInputStream.setRetryPolicy(new OzoneConfiguration() + .getObject(OzoneClientConfig.class)); } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index bcb08f1d9130..7b4bafa30dae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -2259,6 +2259,8 @@ private OzoneInputStream createInputStream( // When Key is not MPU or when Key is MPU and encryption is not enabled // Need to revisit for GDP. FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo(); + // Set read retry policy for the streams + KeyInputStream.setRetryPolicy(clientConfig); if (feInfo == null) { LengthInputStream lengthInputStream = KeyInputStream diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java index a04c1236186c..59dac617664e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.conf.DefaultConfigManager; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -138,6 +139,7 @@ public static void init() throws Exception { startCluster(); client = cluster.newClient(); createTestData(); + KeyInputStream.setRetryPolicy(conf.getObject(OzoneClientConfig.class)); } private static void createTestData() throws IOException {