Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a1befcd
fix synchronized
chungen0126 Mar 18, 2024
7f60604
fix checkstyle
chungen0126 Apr 15, 2024
dfd2108
A ChunkOffsetInBlock to the ChunkInfo proto message to allow checksum…
Sep 24, 2025
e63b367
Refactor to stream entire block to client in a single call. Commented…
Oct 13, 2025
7ee86a3
Implement seek
Oct 14, 2025
28a29ba
Test for and implement no checksum case
Oct 14, 2025
293db1b
Revert "Add ChunkOffsetInBlock to the ChunkInfo proto message to allo…
Oct 14, 2025
e9b7164
Adapted unit tests to the new approach
Oct 15, 2025
d220491
Fix style and remove commented tests
Oct 16, 2025
d4602f3
Remove synchronization from streaming reader
Oct 16, 2025
cbcf804
Remove or comment out unused code
Oct 16, 2025
5595e1d
Throw IOException for ops attempts after closed
Oct 16, 2025
ebb815d
Move setPipeline into shared superclass
Oct 16, 2025
2ea3019
Fix findbugs warning
Oct 16, 2025
cd05b5e
Fix PMD
Oct 16, 2025
6384937
Fix more findbugs
Oct 16, 2025
4ae36bf
Remove test broken by refactor. It is covered by the integration test…
Oct 16, 2025
712ad79
Fix bug caused by refactor
Oct 17, 2025
03e6344
Tidy up formatting
Oct 17, 2025
a07e7d1
Reuse datanode selection and deadline code in XCeiverGRPC
Oct 17, 2025
c191030
Fix findbugs
Oct 17, 2025
1401849
Retry initializing the reader if all DNs fail via pipeline refresh
Oct 20, 2025
e9457ff
Fix broken test caused by refactor
Oct 21, 2025
4a56d10
Refresh pipeline only for specific exception catagories
Oct 21, 2025
bbb99c2
Take and release semaphore when starting / completing streaming reads
Oct 23, 2025
fd1271e
Fix findbugs sync warning
Oct 23, 2025
f9a91a6
Address review comment for handling local errors with onError
Oct 23, 2025
8971f95
Refactor somewhat to address problems when local checksum validation …
Oct 24, 2025
ac95be9
Adding new file missed in last commit
Oct 24, 2025
0d7c7ec
Add missing import
szetszwo Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -65,6 +63,7 @@
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -386,25 +385,9 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
});
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round-robin fashion.
XceiverClientReply reply = new XceiverClientReply(null);
private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto request) throws IOException {
List<DatanodeDetails> datanodeList = null;

DatanodeBlockID blockID = null;
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
blockID = request.getGetBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
}
DatanodeBlockID blockID = getRequestBlockID(request);

if (blockID != null) {
if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
Expand Down Expand Up @@ -442,6 +425,33 @@ private XceiverClientReply sendCommandWithRetry(
if (!allInService) {
datanodeList = sortDatanodeByOperationalState(datanodeList);
}
return datanodeList;
}

private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto request) {
DatanodeBlockID blockID = null;
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
blockID = request.getGetBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
blockID = request.getReadBlock().getBlockID();
}
return blockID;
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round-robin fashion.
XceiverClientReply reply = new XceiverClientReply(null);
List<DatanodeDetails> datanodeList = sortDatanodes(request);

for (DatanodeDetails dn : datanodeList) {
try {
Expand All @@ -453,11 +463,7 @@ private XceiverClientReply sendCommandWithRetry(
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
responseProto = sendCommandReadBlock(request, dn).getResponse().get();
} else {
responseProto = sendCommandAsync(request, dn).getResponse().get();
}
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (validators != null && !validators.isEmpty()) {
for (Validator validator : validators) {
validator.accept(request, responseProto);
Expand Down Expand Up @@ -510,6 +516,66 @@ private XceiverClientReply sendCommandWithRetry(
}
}

/**
* Starts a streaming read operation, intended to read entire blocks from the datanodes. This method expects a
* {@link StreamingReaderSpi} to be passed in, which will be used to receive the streamed data from the datanode.
* Upon successfully starting the streaming read, a {@link StreamingReadResponse} is set into the pass StreamObserver,
* which contains information about the datanode used for the read, and the request observer that can be used to
* manage the stream (e.g., to cancel it if needed). A semaphore is acquired to limit the number of concurrent
* streaming reads so upon successful return of this method, the caller must ensure to call
* {@link #completeStreamRead(StreamingReadResponse)} to release the semaphore once the streaming read is complete.
* @param request The container command request to initiate the streaming read.
* @param streamObserver The observer that will handle the streamed responses.=
* @throws IOException
* @throws InterruptedException
*/
@Override
public void streamRead(ContainerCommandRequestProto request,
StreamingReaderSpi streamObserver) throws IOException, InterruptedException {
List<DatanodeDetails> datanodeList = sortDatanodes(request);
IOException lastException = null;
for (DatanodeDetails dn : datanodeList) {
try {
checkOpen(dn);
semaphore.acquire();
XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
if (stub == null) {
throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Executing command {} on datanode {}", processForDebug(request), dn);
}
StreamObserver<ContainerCommandRequestProto> requestObserver = stub
.withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(streamObserver);
streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn,
(ClientCallStreamObserver<ContainerCommandRequestProto>) requestObserver));
requestObserver.onNext(request);
requestObserver.onCompleted();
return;
} catch (IOException e) {
LOG.error("Failed to start streaming read to DataNode {}", dn, e);
semaphore.release();
lastException = e;
}
}
if (lastException != null) {
throw lastException;
} else {
throw new IOException("Failed to start streaming read to any available DataNodes");
}
}

/**
* This method should be called to indicate the end of streaming read. Its primary purpose is to release the
* semaphore acquired when starting the streaming read, but is also used to update any metrics or debug logs as
* needed.
*/
@Override
public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
semaphore.release();
}

private static List<DatanodeDetails> sortDatanodeByOperationalState(
List<DatanodeDetails> datanodeList) {
List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
Expand Down Expand Up @@ -629,69 +695,6 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
return new XceiverClientReply(replyFuture);
}

public XceiverClientReply sendCommandReadBlock(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, InterruptedException {

CompletableFuture<ContainerCommandResponseProto> future =
new CompletableFuture<>();
ContainerCommandResponseProto.Builder response =
ContainerCommandResponseProto.newBuilder();
ContainerProtos.ReadBlockResponseProto.Builder readBlock =
ContainerProtos.ReadBlockResponseProto.newBuilder();
checkOpen(dn);
DatanodeID dnId = dn.getID();
Type cmdType = request.getCmdType();
semaphore.acquire();
long requestTime = System.currentTimeMillis();
metrics.incrPendingContainerOpsMetrics(cmdType);

final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(
ContainerCommandResponseProto responseProto) {
if (responseProto.getResult() == Result.SUCCESS) {
readBlock.addReadChunk(responseProto.getReadChunk());
} else {
future.complete(
ContainerCommandResponseProto.newBuilder(responseProto)
.setCmdType(Type.ReadBlock).build());
}
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
metrics.decrPendingContainerOpsMetrics(cmdType);
metrics.addContainerOpsLatency(
cmdType, Time.monotonicNow() - requestTime);
semaphore.release();
}

@Override
public void onCompleted() {
if (readBlock.getReadChunkCount() > 0) {
future.complete(response.setReadBlock(readBlock)
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
}
if (!future.isDone()) {
future.completeExceptionally(new IOException(
"Stream completed but no reply for request " +
processForDebug(request)));
}
metrics.decrPendingContainerOpsMetrics(cmdType);
metrics.addContainerOpsLatency(
cmdType, System.currentTimeMillis() - requestTime);
semaphore.release();
}
});
requestObserver.onNext(request);
requestObserver.onCompleted();
return new XceiverClientReply(future);
}

private synchronized void checkOpen(DatanodeDetails dn)
throws IOException {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@

package org.apache.hadoop.hdds.scm.storage;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.io.grpc.Status;

/**
* Abstract class used as an interface for input streams related to Ozone
Expand All @@ -26,6 +39,8 @@
public abstract class BlockExtendedInputStream extends ExtendedInputStream
implements PartInputStream {

private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(BlockExtendedInputStream.class);

public abstract BlockID getBlockID();

@Override
Expand All @@ -38,4 +53,86 @@ public long getRemaining() {

@Override
public abstract long getPos();

protected Pipeline setPipeline(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return null;
}
long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();

if (replicaIndexes > 1) {
throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.",
pipeline));
}

// irrespective of the container state, we will always read via Standalone protocol.
boolean okForRead = pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
|| pipeline.getType() == HddsProtos.ReplicationType.EC;
return okForRead ? pipeline : pipeline.copyForRead();
}

protected boolean shouldRetryRead(IOException cause, RetryPolicy retryPolicy, int retries) throws IOException {
RetryPolicy.RetryAction retryAction;
try {
retryAction = retryPolicy.shouldRetry(cause, retries, 0, true);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
if (retryAction.delayMillis > 0) {
try {
LOG.debug("Retry read after {}ms", retryAction.delayMillis);
Thread.sleep(retryAction.delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy;
throw new IOException(msg, e);
}
}
return true;
}
return false;
}

protected RetryPolicy getReadRetryPolicy(OzoneClientConfig config) {
return HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
}

protected void refreshBlockInfo(IOException cause, BlockID blockID, AtomicReference<Pipeline> pipelineRef,
AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef, Function<BlockID, BlockLocationInfo> refreshFunction)
throws IOException {
LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}",
blockID, pipelineRef.get().getId(), cause.getMessage());
if (refreshFunction != null) {
LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
if (blockLocationInfo == null) {
LOG.warn("No new block location info for block {}", blockID);
} else {
pipelineRef.set(setPipeline(blockLocationInfo.getPipeline()));
LOG.info("New pipeline for block {}: {}", blockID, blockLocationInfo.getPipeline());

tokenRef.set(blockLocationInfo.getToken());
if (blockLocationInfo.getToken() != null) {
OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
tokenId.readFromByteArray(tokenRef.get().getIdentifier());
LOG.info("A new token is added for block {}. Expiry: {}", blockID,
Instant.ofEpochMilli(tokenId.getExpiryDate()));
}
}
} else {
throw cause;
}
}

/**
* Check if this exception is because datanodes are not reachable.
*/
protected boolean isConnectivityIssue(IOException ex) {
return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
}

}
Loading