Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
136 commits
Select commit Hold shift + click to select a range
d6d7b2a
implement data stream api
chungen0126 Mar 11, 2024
d8a519d
implement XceiverClientGrpc#sendCommandOnlyRead
chungen0126 Mar 11, 2024
ffebd66
read data to buffers
chungen0126 Mar 12, 2024
a6ed056
create NewBlockInputStream to support Streaming data
chungen0126 Mar 17, 2024
fbd20eb
fix checkstyle
chungen0126 Mar 18, 2024
a663604
fix checkstyle
chungen0126 Mar 18, 2024
0f35371
fix synchronized
chungen0126 Mar 18, 2024
80329dc
fix synchronized
chungen0126 Mar 18, 2024
e398d17
fix synchronized
chungen0126 Mar 18, 2024
a404266
fix synchronized
chungen0126 Mar 18, 2024
c57c2e6
fix synchronized
chungen0126 Mar 18, 2024
4dc9082
fix synchronized
chungen0126 Mar 18, 2024
af4a25b
ignore find bugs in TestNewBlockInputStream
chungen0126 Mar 18, 2024
79ae3eb
clean up
chungen0126 Mar 18, 2024
b1b301e
implement server side stream data
chungen0126 Apr 9, 2024
ad0fd8d
fix bug
chungen0126 Apr 9, 2024
e86aee9
fix bug
chungen0126 Apr 9, 2024
5442761
Merge branch 'master' into HDDS-10338
chungen0126 Apr 10, 2024
8d97d47
fix bug
chungen0126 Apr 12, 2024
b694448
fix bug
chungen0126 Apr 12, 2024
d5dc908
fix bug
chungen0126 Apr 12, 2024
74eac1e
fix bug
chungen0126 Apr 15, 2024
29c8f80
fix bug
chungen0126 Apr 15, 2024
741effb
fix bug
chungen0126 Apr 15, 2024
f1d4d7f
fix checkstyle
chungen0126 Apr 15, 2024
6a67375
fix bug
chungen0126 Apr 16, 2024
b0c64d7
Merge branch 'master' into HDDS-10338
chungen0126 Apr 16, 2024
b4cfd3f
fix checkstyle
chungen0126 Apr 16, 2024
91631ac
fix bug
chungen0126 Apr 16, 2024
6e36ec1
fix bug
chungen0126 Apr 17, 2024
e0b1d2b
fix bug
chungen0126 Apr 17, 2024
0e8576e
fix bug
chungen0126 Apr 17, 2024
f243c46
fix bug
chungen0126 Apr 17, 2024
1d8500f
Merge branch 'master' into HDDS-10338
chungen0126 Apr 17, 2024
80eb936
fix bug
chungen0126 Apr 17, 2024
7afa66b
Merge branch 'master' into HDDS-10338
chungen0126 May 2, 2024
b26b51d
fix protobuf
chungen0126 May 2, 2024
c4e8b2b
rename NewBlockInputStream
chungen0126 May 27, 2024
56d8ca9
revert sendCommandWithRetry
chungen0126 Jul 18, 2024
7807741
Merge branch 'master' into HDDS-10338
chungen0126 Jul 18, 2024
a8ea3ec
Merge branch 'master' into HDDS-10338
chungen0126 Jul 18, 2024
57b874c
make strea block configurable
chungen0126 Jul 18, 2024
7b98108
fix checkstyle
chungen0126 Jul 18, 2024
db82e3b
remove TestStreamBlockInput
chungen0126 Jul 18, 2024
158d2e4
fix checkstyle
chungen0126 Jul 18, 2024
4935668
Merge branch 'master' into HDDS-10338
chungen0126 Aug 14, 2024
d87d2c4
Merge branch 'master' into HDDS-10338
chungen0126 Sep 4, 2024
4b5e14c
Merge branch 'master' into HDDS-10338
chungen0126 Sep 4, 2024
0d0e31c
Merge branch 'master' into HDDS-10338
chungen0126 Sep 5, 2024
d7e9c92
Merge branch 'master' into HDDS-10338
chungen0126 Sep 13, 2024
3cefe36
update for compabatbilities and add some tests
chungen0126 Sep 25, 2024
61b79ae
default streamReadBlock as false
chungen0126 Sep 25, 2024
5fb1478
log BlockInputStream Initializing
chungen0126 Sep 30, 2024
b857fba
remove ReadBlockResponse
chungen0126 Oct 3, 2024
92505bc
remove ReadBlockResponse
chungen0126 Oct 3, 2024
8202468
remove V0
chungen0126 Oct 3, 2024
a1925c4
fix checkstyle
chungen0126 Oct 3, 2024
ef23455
remove V0
chungen0126 Oct 3, 2024
7a7388f
fix checkstyle
chungen0126 Oct 4, 2024
7bbe3d9
adress comments
chungen0126 Oct 5, 2024
71094bd
add testReadBlock in TestKeyValueHandler and rename variable
chungen0126 Oct 8, 2024
9e5f77c
fix checkstyle and fix bug
chungen0126 Oct 8, 2024
281e91e
revert StreamObserver<ContainerCommandResponseProto>.onComplete
chungen0126 Oct 8, 2024
397ef40
create functions to handle exception
chungen0126 Oct 9, 2024
fe5f8ec
address comments
chungen0126 Oct 9, 2024
9566b89
address comments
chungen0126 Oct 16, 2024
200da6d
address comments
chungen0126 Oct 16, 2024
0814a22
address comments
chungen0126 Oct 18, 2024
e040768
address comment
chungen0126 Oct 18, 2024
ece66fc
Merge branch 'master' into HDDS-10338
chungen0126 Nov 8, 2024
398692e
fix DummyStreamBlockInput
chungen0126 Nov 15, 2024
fead0b7
rmove StreamData type
chungen0126 Nov 15, 2024
2fb2851
fix checkstyle
chungen0126 Nov 15, 2024
dec005d
Merge branch 'master' into HDDS-10338
chungen0126 Nov 25, 2024
1968756
Merge branch 'master' into HDDS-10338
chungen0126 Nov 25, 2024
5c70fd4
fix verify checksum
chungen0126 Nov 27, 2024
bc804ff
no need to compute startByteIndex
chungen0126 Nov 28, 2024
0e4e41e
address comments
chungen0126 Dec 3, 2024
f8e5f28
address comments
chungen0126 Dec 3, 2024
228de8d
add read empty block in TestStreamBlockInputStream.java
chungen0126 Dec 3, 2024
c24da9b
address comments
chungen0126 Dec 6, 2024
f457ac2
fix checkstyle
chungen0126 Dec 6, 2024
c1fbad2
address comments
chungen0126 Dec 12, 2024
5e2b3cf
Merge branch 'master' into HDDS-10338
chungen0126 Jan 30, 2025
ba100c8
fix conflict
chungen0126 Jan 30, 2025
1fec95f
Merge remote-tracking branch 'origin/master' into HDDS-10338
adoroszlai Feb 15, 2025
7daa875
Merge branch 'master' into HDDS-10338
chungen0126 Mar 16, 2025
5db133a
fix checks
chungen0126 Mar 16, 2025
2bf716d
fix checks
chungen0126 Mar 17, 2025
d53bdcb
fix checkstyle
chungen0126 Mar 17, 2025
aa51cbb
fix test
chungen0126 Mar 19, 2025
03986e4
Merge branch 'master' into HDDS-10338
jojochuang Mar 25, 2025
2d07ec0
Merge branch 'master' into HDDS-10338
chungen0126 Jun 17, 2025
d39fd6b
fix checkstyle
chungen0126 Jun 17, 2025
7656d66
fix find bug
chungen0126 Jun 17, 2025
b794dd9
fix checkstyle
chungen0126 Jun 17, 2025
eec8afb
fix find bug
chungen0126 Jun 17, 2025
dbab8e2
Merge branch 'master' into HDDS-10338
chungen0126 Jun 25, 2025
fc95079
fix test
chungen0126 Jun 25, 2025
ec0aa17
Merge branch 'master' into streaming
Sep 19, 2025
3f2396f
Fix compile error due to no scanner message
Sep 23, 2025
1f90a0a
A ChunkOffsetInBlock to the ChunkInfo proto message to allow checksum…
Sep 24, 2025
009938c
Merge branch 'master' into streaming
Sep 24, 2025
8ba23fc
Remove open telemetery reference
Sep 24, 2025
7a3cd39
Fix pmd error
Sep 24, 2025
815a0cb
Merge branch 'master' into streaming
Sep 29, 2025
4c91e42
Refactor to stream entire block to client in a single call. Commented…
Oct 13, 2025
e2cdb12
Implement seek
Oct 14, 2025
f729fcc
Test for and implement no checksum case
Oct 14, 2025
94ad468
Revert "Add ChunkOffsetInBlock to the ChunkInfo proto message to allo…
Oct 14, 2025
03898c2
Adapted unit tests to the new approach
Oct 15, 2025
10f3094
Fix style and remove commented tests
Oct 16, 2025
877df66
Remove synchronization from streaming reader
Oct 16, 2025
5525770
Remove or comment out unused code
Oct 16, 2025
325e833
Merge branch 'master' into streaming
Oct 16, 2025
8d3ddff
Throw IOException for ops attempts after closed
Oct 16, 2025
7a37fcc
Move setPipeline into shared superclass
Oct 16, 2025
559c8a1
Fix findbugs warning
Oct 16, 2025
71c89f0
Fix PMD
Oct 16, 2025
aee40b4
Fix more findbugs
Oct 16, 2025
5b99b8d
Remove test broken by refactor. It is covered by the integration test…
Oct 16, 2025
3f117b7
Fix bug caused by refactor
Oct 17, 2025
7234baa
Tidy up formatting
Oct 17, 2025
af70910
Reuse datanode selection and deadline code in XCeiverGRPC
Oct 17, 2025
4b37064
Fix findbugs
Oct 17, 2025
4d74479
Retry initializing the reader if all DNs fail via pipeline refresh
Oct 20, 2025
6bd9885
Fix broken test caused by refactor
Oct 21, 2025
b12e235
Refresh pipeline only for specific exception catagories
Oct 21, 2025
27d4c66
Merge branch 'master' into streaming
Oct 21, 2025
6e4463e
Take and release semaphore when starting / completing streaming reads
Oct 23, 2025
ca61312
Fix findbugs sync warning
Oct 23, 2025
79ceaa9
Address review comment for handling local errors with onError
Oct 23, 2025
5b70412
Refactor somewhat to address problems when local checksum validation …
Oct 24, 2025
726f8e4
Adding new file missed in last commit
Oct 24, 2025
117fa17
Implement flow control to backoff the stream when the client is slow
Nov 18, 2025
4375666
Update hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/st…
sodonnel Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private long streamBufferMaxSize = 32 * 1024 * 1024;

@Config(key = "stream.readblock.enable",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Allow ReadBlock to stream all the readChunk in one request.",
tags = ConfigTag.CLIENT)
private boolean streamReadBlock = false;

@Config(key = "ozone.client.max.retries",
defaultValue = "5",
description = "Maximum number of retries by Ozone Client on "
Expand Down Expand Up @@ -151,7 +158,7 @@ public class OzoneClientConfig {
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
+ "determines which algorithm would be used to compute checksum for "
+ "chunk data. Default checksum type is CRC32.",
tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
private String checksumType = ChecksumType.CRC32.name();

@Config(key = "ozone.client.bytes.per.checksum",
Expand All @@ -160,7 +167,7 @@ public class OzoneClientConfig {
description = "Checksum will be computed for every bytes per checksum "
+ "number of bytes and stored sequentially. The minimum value for "
+ "this config is 8KB.",
tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
private int bytesPerChecksum = 16 * 1024;

@Config(key = "ozone.client.verify.checksum",
Expand Down Expand Up @@ -538,6 +545,14 @@ public int getMaxConcurrentWritePerKey() {
return this.maxConcurrentWritePerKey;
}

public boolean isStreamReadBlock() {
return streamReadBlock;
}

public void setStreamReadBlock(boolean streamReadBlock) {
this.streamReadBlock = streamReadBlock;
}

/**
* Enum for indicating what mode to use when combining chunk and block
* checksums to define an aggregate FileChecksum. This should be considered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,25 +384,9 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
});
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round-robin fashion.
XceiverClientReply reply = new XceiverClientReply(null);
private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto request) throws IOException {
List<DatanodeDetails> datanodeList = null;

DatanodeBlockID blockID = null;
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
blockID = request.getGetBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
}
DatanodeBlockID blockID = getRequestBlockID(request);

if (blockID != null) {
if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
Expand Down Expand Up @@ -440,6 +424,33 @@ private XceiverClientReply sendCommandWithRetry(
if (!allInService) {
datanodeList = sortDatanodeByOperationalState(datanodeList);
}
return datanodeList;
}

private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto request) {
DatanodeBlockID blockID = null;
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
blockID = request.getGetBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
blockID = request.getReadBlock().getBlockID();
}
return blockID;
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round-robin fashion.
XceiverClientReply reply = new XceiverClientReply(null);
List<DatanodeDetails> datanodeList = sortDatanodes(request);

for (DatanodeDetails dn : datanodeList) {
try {
Expand Down Expand Up @@ -495,7 +506,7 @@ private XceiverClientReply sendCommandWithRetry(
String message = "Failed to execute command {}";
if (LOG.isDebugEnabled()) {
LOG.debug(message + " on the pipeline {}.",
processForDebug(request), pipeline);
processForDebug(request), pipeline);
} else {
LOG.warn(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
Expand All @@ -504,6 +515,62 @@ private XceiverClientReply sendCommandWithRetry(
}
}

/**
* Starts a streaming read operation, intended to read entire blocks from the datanodes. This method expects a
* {@link StreamingReaderSpi} to be passed in, which will be used to receive the streamed data from the datanode.
* Upon successfully starting the streaming read, a {@link StreamingReadResponse} is set into the pass StreamObserver,
* which contains information about the datanode used for the read, and the request observer that can be used to
* manage the stream (e.g., to cancel it if needed). A semaphore is acquired to limit the number of concurrent
* streaming reads so upon successful return of this method, the caller must ensure to call
* {@link #completeStreamRead()} to release the semaphore once the streaming read is complete.
* @param request The container command request to initiate the streaming read.
* @param streamObserver The observer that will handle the streamed responses.=
* @throws IOException
* @throws InterruptedException
*/
@Override
public void streamRead(ContainerCommandRequestProto request,
StreamingReaderSpi streamObserver) throws IOException, InterruptedException {
List<DatanodeDetails> datanodeList = sortDatanodes(request);
IOException lastException = null;
for (DatanodeDetails dn : datanodeList) {
try {
checkOpen(dn);
semaphore.acquire();
XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
if (stub == null) {
throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Executing command {} on datanode {}", processForDebug(request), dn);
}
stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)
.streamBlock(request, streamObserver);
streamObserver.setStreamingDatanode(dn);
return;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we do that only for one node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it iterates over the list of available datanodes, and uses the first one that works. If the first one fails to connect, it will try to use the next one etc.

} catch (IOException e) {
LOG.error("Failed to start streaming read to DataNode {}", dn, e);
semaphore.release();
lastException = e;
}
}
if (lastException != null) {
throw lastException;
} else {
throw new IOException("Failed to start streaming read to any available DataNodes");
}
}

/**
* This method should be called to indicate the end of streaming read. Its primary purpose is to release the
* semaphore acquired when starting the streaming read, but is also used to update any metrics or debug logs as
* needed.
*/
@Override
public void completeStreamRead() {
semaphore.release();
}

private static List<DatanodeDetails> sortDatanodeByOperationalState(
List<DatanodeDetails> datanodeList) {
List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@

package org.apache.hadoop.hdds.scm.storage;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.io.grpc.Status;

/**
* Abstract class used as an interface for input streams related to Ozone
Expand All @@ -26,6 +39,8 @@
public abstract class BlockExtendedInputStream extends ExtendedInputStream
implements PartInputStream {

private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(BlockExtendedInputStream.class);

public abstract BlockID getBlockID();

@Override
Expand All @@ -38,4 +53,86 @@ public long getRemaining() {

@Override
public abstract long getPos();

protected Pipeline setPipeline(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return null;
}
long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();

if (replicaIndexes > 1) {
throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.",
pipeline));
}

// irrespective of the container state, we will always read via Standalone protocol.
boolean okForRead = pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
|| pipeline.getType() == HddsProtos.ReplicationType.EC;
return okForRead ? pipeline : pipeline.copyForRead();
}

protected boolean shouldRetryRead(IOException cause, RetryPolicy retryPolicy, int retries) throws IOException {
RetryPolicy.RetryAction retryAction;
try {
retryAction = retryPolicy.shouldRetry(cause, retries, 0, true);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
if (retryAction.delayMillis > 0) {
try {
LOG.debug("Retry read after {}ms", retryAction.delayMillis);
Thread.sleep(retryAction.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy;
throw new IOException(msg, e);
}
}
return true;
}
return false;
}

protected RetryPolicy getReadRetryPolicy(OzoneClientConfig config) {
return HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
}

protected void refreshBlockInfo(IOException cause, BlockID blockID, AtomicReference<Pipeline> pipelineRef,
AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef, Function<BlockID, BlockLocationInfo> refreshFunction)
throws IOException {
LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}",
blockID, pipelineRef.get().getId(), cause.getMessage());
if (refreshFunction != null) {
LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
if (blockLocationInfo == null) {
LOG.warn("No new block location info for block {}", blockID);
} else {
pipelineRef.set(setPipeline(blockLocationInfo.getPipeline()));
LOG.info("New pipeline for block {}: {}", blockID, blockLocationInfo.getPipeline());

tokenRef.set(blockLocationInfo.getToken());
if (blockLocationInfo.getToken() != null) {
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
tokenId.readFromByteArray(tokenRef.get().getIdentifier());
LOG.info("A new token is added for block {}. Expiry: {}", blockID,
Instant.ofEpochMilli(tokenId.getExpiryDate()));
}
}
} else {
throw cause;
}
}

/**
* Check if this exception is because datanodes are not reachable.
*/
protected boolean isConnectivityIssue(IOException ex) {
return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
}

}
Loading
Loading