Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int retryInterval = 0;

@Config(key = "max.read.retries",
defaultValue = "3",
description = "Maximum number of retries by Ozone Client on "
+ "encountering connectivity exception when reading a key.",
tags = ConfigTag.CLIENT)
private int maxReadRetryCount = 3;

@Config(key = "read.retry.interval",
defaultValue = "1",
description =
"Indicates the time duration in seconds a client will wait "
+ "before retrying a read key request on encountering "
+ "a connectivity excepetion from Datanodes . "
+ "By default the interval is 1millisecond",
tags = ConfigTag.CLIENT)
private int readRetryInterval = 1;

@Config(key = "checksum.type",
defaultValue = "CRC32",
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
Expand Down Expand Up @@ -326,6 +343,22 @@ public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}

public int getMaxReadRetryCount() {
return maxReadRetryCount;
}

public void setMaxReadRetryCount(int maxReadRetryCount) {
this.maxReadRetryCount = maxReadRetryCount;
}

public int getReadRetryInterval() {
return readRetryInterval;
}

public void setReadRetryInterval(int readRetryInterval) {
this.readRetryInterval = readRetryInterval;
}

public ChecksumType getChecksumType() {
return ChecksumType.valueOf(checksumType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

Expand All @@ -39,7 +38,6 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
Expand Down Expand Up @@ -76,8 +74,11 @@ public class BlockInputStream extends BlockExtendedInputStream {
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
// TODO: do we need to change retrypolicy based on exception.
private final RetryPolicy retryPolicy =
HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
// Default retry, retry for DN connectivity issues & what else?
private final RetryPolicy retryPolicy = getRetryPolicy();
//=
// HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));

private int retries;

// List of ChunkInputStreams, one for each chunk in the block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

/**
* Abstact class which extends InputStream and some common interfaces used by
Expand All @@ -36,6 +40,7 @@ public abstract class ExtendedInputStream extends InputStream
implements Seekable, CanUnbuffer, ByteBufferReadable, StreamCapabilities {

protected static final int EOF = -1;
private static RetryPolicy retryPolicy;

@Override
public synchronized int read() throws IOException {
Expand Down Expand Up @@ -101,4 +106,14 @@ public boolean hasCapability(String capability) {
return false;
}
}

public static void setRetryPolicy(OzoneClientConfig config) {
retryPolicy =
HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
Comment on lines +111 to +113
Copy link
Contributor

@adoroszlai adoroszlai Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing this as static seems unsafe.

  1. Code is spread in 3 places:

    • this class
    • class that calls setRetryPolicy
    • class that uses the policy

    It is hard to see if any usage may get outdated or null value.

  2. Concurrent clients may get unexpected results.

}

public static RetryPolicy getRetryPolicy() {
return retryPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -96,6 +98,8 @@ public void setup() throws Exception {
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
blockStream = new DummyBlockInputStream(blockID, blockSize, pipeline, null,
false, null, refreshFunction, chunks, chunkDataMap);
BlockInputStream.setRetryPolicy(new OzoneConfiguration()
.getObject(OzoneClientConfig.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,8 @@ private OzoneInputStream createInputStream(
// When Key is not MPU or when Key is MPU and encryption is not enabled
// Need to revisit for GDP.
FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
// Set read retry policy for the streams
KeyInputStream.setRetryPolicy(clientConfig);

if (feInfo == null) {
LengthInputStream lengthInputStream = KeyInputStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.DefaultConfigManager;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -138,6 +139,7 @@ public static void init() throws Exception {
startCluster();
client = cluster.newClient();
createTestData();
KeyInputStream.setRetryPolicy(conf.getObject(OzoneClientConfig.class));
}

private static void createTestData() throws IOException {
Expand Down