diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 09e01593feb1..b6a3d00f010f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -44,8 +44,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -65,6 +63,7 @@ import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; import org.slf4j.Logger; @@ -386,25 +385,9 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry( }); } - private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List validators) - throws IOException { - ContainerCommandResponseProto responseProto = null; - IOException ioException = null; - - // In case of an exception or an error, we will try to read from the - // datanodes in the pipeline in a round-robin fashion. - XceiverClientReply reply = new XceiverClientReply(null); + private List sortDatanodes(ContainerCommandRequestProto request) throws IOException { List datanodeList = null; - - DatanodeBlockID blockID = null; - if (request.getCmdType() == ContainerProtos.Type.GetBlock) { - blockID = request.getGetBlock().getBlockID(); - } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { - blockID = request.getReadChunk().getBlockID(); - } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) { - blockID = request.getGetSmallFile().getBlock().getBlockID(); - } + DatanodeBlockID blockID = getRequestBlockID(request); if (blockID != null) { if (request.getCmdType() != ContainerProtos.Type.ReadChunk) { @@ -442,6 +425,33 @@ private XceiverClientReply sendCommandWithRetry( if (!allInService) { datanodeList = sortDatanodeByOperationalState(datanodeList); } + return datanodeList; + } + + private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto request) { + DatanodeBlockID blockID = null; + if (request.getCmdType() == ContainerProtos.Type.GetBlock) { + blockID = request.getGetBlock().getBlockID(); + } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { + blockID = request.getReadChunk().getBlockID(); + } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) { + blockID = request.getGetSmallFile().getBlock().getBlockID(); + } else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) { + blockID = request.getReadBlock().getBlockID(); + } + return blockID; + } + + private XceiverClientReply sendCommandWithRetry( + ContainerCommandRequestProto request, List validators) + throws IOException { + ContainerCommandResponseProto responseProto = null; + IOException ioException = null; + + // In case of an exception or an error, we will try to read from the + // datanodes in the pipeline in a round-robin fashion. + XceiverClientReply reply = new XceiverClientReply(null); + List datanodeList = sortDatanodes(request); for (DatanodeDetails dn : datanodeList) { try { @@ -453,11 +463,7 @@ private XceiverClientReply sendCommandWithRetry( // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. reply.addDatanode(dn); - if (request.getCmdType() == ContainerProtos.Type.ReadBlock) { - responseProto = sendCommandReadBlock(request, dn).getResponse().get(); - } else { - responseProto = sendCommandAsync(request, dn).getResponse().get(); - } + responseProto = sendCommandAsync(request, dn).getResponse().get(); if (validators != null && !validators.isEmpty()) { for (Validator validator : validators) { validator.accept(request, responseProto); @@ -510,6 +516,66 @@ private XceiverClientReply sendCommandWithRetry( } } + /** + * Starts a streaming read operation, intended to read entire blocks from the datanodes. This method expects a + * {@link StreamingReaderSpi} to be passed in, which will be used to receive the streamed data from the datanode. + * Upon successfully starting the streaming read, a {@link StreamingReadResponse} is set into the pass StreamObserver, + * which contains information about the datanode used for the read, and the request observer that can be used to + * manage the stream (e.g., to cancel it if needed). A semaphore is acquired to limit the number of concurrent + * streaming reads so upon successful return of this method, the caller must ensure to call + * {@link #completeStreamRead(StreamingReadResponse)} to release the semaphore once the streaming read is complete. + * @param request The container command request to initiate the streaming read. + * @param streamObserver The observer that will handle the streamed responses.= + * @throws IOException + * @throws InterruptedException + */ + @Override + public void streamRead(ContainerCommandRequestProto request, + StreamingReaderSpi streamObserver) throws IOException, InterruptedException { + List datanodeList = sortDatanodes(request); + IOException lastException = null; + for (DatanodeDetails dn : datanodeList) { + try { + checkOpen(dn); + semaphore.acquire(); + XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID()); + if (stub == null) { + throw new IOException("Failed to get gRPC stub for DataNode: " + dn); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Executing command {} on datanode {}", processForDebug(request), dn); + } + StreamObserver requestObserver = stub + .withDeadlineAfter(timeout, TimeUnit.SECONDS) + .send(streamObserver); + streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn, + (ClientCallStreamObserver) requestObserver)); + requestObserver.onNext(request); + requestObserver.onCompleted(); + return; + } catch (IOException e) { + LOG.error("Failed to start streaming read to DataNode {}", dn, e); + semaphore.release(); + lastException = e; + } + } + if (lastException != null) { + throw lastException; + } else { + throw new IOException("Failed to start streaming read to any available DataNodes"); + } + } + + /** + * This method should be called to indicate the end of streaming read. Its primary purpose is to release the + * semaphore acquired when starting the streaming read, but is also used to update any metrics or debug logs as + * needed. + */ + @Override + public void completeStreamRead(StreamingReadResponse streamingReadResponse) { + semaphore.release(); + } + private static List sortDatanodeByOperationalState( List datanodeList) { List sortedDatanodeList = new ArrayList<>(datanodeList); @@ -629,69 +695,6 @@ private void decreasePendingMetricsAndReleaseSemaphore() { return new XceiverClientReply(replyFuture); } - public XceiverClientReply sendCommandReadBlock( - ContainerCommandRequestProto request, DatanodeDetails dn) - throws IOException, InterruptedException { - - CompletableFuture future = - new CompletableFuture<>(); - ContainerCommandResponseProto.Builder response = - ContainerCommandResponseProto.newBuilder(); - ContainerProtos.ReadBlockResponseProto.Builder readBlock = - ContainerProtos.ReadBlockResponseProto.newBuilder(); - checkOpen(dn); - DatanodeID dnId = dn.getID(); - Type cmdType = request.getCmdType(); - semaphore.acquire(); - long requestTime = System.currentTimeMillis(); - metrics.incrPendingContainerOpsMetrics(cmdType); - - final StreamObserver requestObserver = - asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS) - .send(new StreamObserver() { - @Override - public void onNext( - ContainerCommandResponseProto responseProto) { - if (responseProto.getResult() == Result.SUCCESS) { - readBlock.addReadChunk(responseProto.getReadChunk()); - } else { - future.complete( - ContainerCommandResponseProto.newBuilder(responseProto) - .setCmdType(Type.ReadBlock).build()); - } - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - metrics.decrPendingContainerOpsMetrics(cmdType); - metrics.addContainerOpsLatency( - cmdType, Time.monotonicNow() - requestTime); - semaphore.release(); - } - - @Override - public void onCompleted() { - if (readBlock.getReadChunkCount() > 0) { - future.complete(response.setReadBlock(readBlock) - .setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build()); - } - if (!future.isDone()) { - future.completeExceptionally(new IOException( - "Stream completed but no reply for request " + - processForDebug(request))); - } - metrics.decrPendingContainerOpsMetrics(cmdType); - metrics.addContainerOpsLatency( - cmdType, System.currentTimeMillis() - requestTime); - semaphore.release(); - } - }); - requestObserver.onNext(request); - requestObserver.onCompleted(); - return new XceiverClientReply(future); - } - private synchronized void checkOpen(DatanodeDetails dn) throws IOException { if (closed) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java index 6753f600a91b..dc47a9edacaf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java @@ -17,7 +17,20 @@ package org.apache.hadoop.hdds.scm.storage; +import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.token.Token; +import org.apache.ratis.thirdparty.io.grpc.Status; /** * Abstract class used as an interface for input streams related to Ozone @@ -26,6 +39,8 @@ public abstract class BlockExtendedInputStream extends ExtendedInputStream implements PartInputStream { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(BlockExtendedInputStream.class); + public abstract BlockID getBlockID(); @Override @@ -38,4 +53,86 @@ public long getRemaining() { @Override public abstract long getPos(); + + protected Pipeline setPipeline(Pipeline pipeline) throws IOException { + if (pipeline == null) { + return null; + } + long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count(); + + if (replicaIndexes > 1) { + throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.", + pipeline)); + } + + // irrespective of the container state, we will always read via Standalone protocol. + boolean okForRead = pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE + || pipeline.getType() == HddsProtos.ReplicationType.EC; + return okForRead ? pipeline : pipeline.copyForRead(); + } + + protected boolean shouldRetryRead(IOException cause, RetryPolicy retryPolicy, int retries) throws IOException { + RetryPolicy.RetryAction retryAction; + try { + retryAction = retryPolicy.shouldRetry(cause, retries, 0, true); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { + if (retryAction.delayMillis > 0) { + try { + LOG.debug("Retry read after {}ms", retryAction.delayMillis); + Thread.sleep(retryAction.delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy; + throw new IOException(msg, e); + } + } + return true; + } + return false; + } + + protected RetryPolicy getReadRetryPolicy(OzoneClientConfig config) { + return HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), + TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); + } + + protected void refreshBlockInfo(IOException cause, BlockID blockID, AtomicReference pipelineRef, + AtomicReference> tokenRef, Function refreshFunction) + throws IOException { + LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}", + blockID, pipelineRef.get().getId(), cause.getMessage()); + if (refreshFunction != null) { + LOG.debug("Re-fetching pipeline and block token for block {}", blockID); + BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID); + if (blockLocationInfo == null) { + LOG.warn("No new block location info for block {}", blockID); + } else { + pipelineRef.set(setPipeline(blockLocationInfo.getPipeline())); + LOG.info("New pipeline for block {}: {}", blockID, blockLocationInfo.getPipeline()); + + tokenRef.set(blockLocationInfo.getToken()); + if (blockLocationInfo.getToken() != null) { + OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); + tokenId.readFromByteArray(tokenRef.get().getIdentifier()); + LOG.info("A new token is added for block {}. Expiry: {}", blockID, + Instant.ofEpochMilli(tokenId.getExpiryDate())); + } + } + } else { + throw cause; + } + } + + /** + * Check if this exception is because datanodes are not reachable. + */ + protected boolean isConnectivityIssue(IOException ex) { + return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); + } + } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index c9731bcc4611..6f6b513422f7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -22,11 +22,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; @@ -35,19 +33,16 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.security.token.Token; -import org.apache.ratis.thirdparty.io.grpc.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,14 +118,12 @@ public BlockInputStream( this.blockInfo = blockInfo; this.blockID = blockInfo.getBlockID(); this.length = blockInfo.getLength(); - setPipeline(pipeline); + pipelineRef.set(setPipeline(pipeline)); tokenRef.set(token); this.verifyChecksum = config.isChecksumVerify(); this.xceiverClientFactory = xceiverClientFactory; this.refreshFunction = refreshFunction; - this.retryPolicy = - HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), - TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); + this.retryPolicy = getReadRetryPolicy(config); } // only for unit tests @@ -182,7 +175,7 @@ public synchronized void initialize() throws IOException { } catchEx = ex; } - } while (shouldRetryRead(catchEx)); + } while (shouldRetryRead(catchEx, retryPolicy, ++retries)); if (chunks == null) { throw catchEx; @@ -215,37 +208,8 @@ public synchronized void initialize() throws IOException { } } - /** - * Check if this exception is because datanodes are not reachable. - */ - private boolean isConnectivityIssue(IOException ex) { - return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); - } - private void refreshBlockInfo(IOException cause) throws IOException { - LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}", - blockID, pipelineRef.get().getId(), cause.getMessage()); - if (refreshFunction != null) { - LOG.debug("Re-fetching pipeline and block token for block {}", blockID); - BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID); - if (blockLocationInfo == null) { - LOG.warn("No new block location info for block {}", blockID); - } else { - setPipeline(blockLocationInfo.getPipeline()); - LOG.info("New pipeline for block {}: {}", blockID, - blockLocationInfo.getPipeline()); - - tokenRef.set(blockLocationInfo.getToken()); - if (blockLocationInfo.getToken() != null) { - OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); - tokenId.readFromByteArray(tokenRef.get().getIdentifier()); - LOG.info("A new token is added for block {}. Expiry: {}", - blockID, Instant.ofEpochMilli(tokenId.getExpiryDate())); - } - } - } else { - throw cause; - } + refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction); } /** @@ -277,26 +241,6 @@ protected BlockData getBlockDataUsingClient() throws IOException { return response.getBlockData(); } - private void setPipeline(Pipeline pipeline) throws IOException { - if (pipeline == null) { - return; - } - long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count(); - - if (replicaIndexes > 1) { - throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.", - pipeline)); - } - - // irrespective of the container state, we will always read via Standalone - // protocol. - boolean okForRead = - pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE - || pipeline.getType() == HddsProtos.ReplicationType.EC; - Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead(); - pipelineRef.set(readPipeline); - } - private static void validate(ContainerCommandResponseProto response) throws IOException { if (!response.hasGetBlock()) { @@ -382,14 +326,14 @@ protected synchronized int readWithStrategy(ByteReaderStrategy strategy) } catch (SCMSecurityException ex) { throw ex; } catch (StorageContainerException e) { - if (shouldRetryRead(e)) { + if (shouldRetryRead(e, retryPolicy, ++retries)) { handleReadError(e); continue; } else { throw e; } } catch (IOException ex) { - if (shouldRetryRead(ex)) { + if (shouldRetryRead(ex, retryPolicy, ++retries)) { if (isConnectivityIssue(ex)) { handleReadError(ex); } else { @@ -573,31 +517,6 @@ private synchronized void storePosition() { blockPosition = getPos(); } - private boolean shouldRetryRead(IOException cause) throws IOException { - RetryPolicy.RetryAction retryAction; - try { - retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(e); - } - if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { - if (retryAction.delayMillis > 0) { - try { - LOG.debug("Retry read after {}ms", retryAction.delayMillis); - Thread.sleep(retryAction.delayMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy; - throw new IOException(msg, e); - } - } - return true; - } - return false; - } - private void handleReadError(IOException cause) throws IOException { releaseClient(); final List inputStreams = this.chunkStreams; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index 3db7fd8f6603..cb2f80ca7c83 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -17,46 +17,36 @@ package org.apache.hadoop.hdds.scm.storage; -import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamingReadResponse; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; -import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.security.token.Token; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,31 +56,26 @@ */ public class StreamBlockInputStream extends BlockExtendedInputStream implements Seekable, CanUnbuffer, ByteBufferReadable { - private static final Logger LOG = - LoggerFactory.getLogger(StreamBlockInputStream.class); + private static final Logger LOG = LoggerFactory.getLogger(StreamBlockInputStream.class); + private static final int EOF = -1; + private static final Throwable CANCELLED_EXCEPTION = new Throwable("Cancelled by client"); + private final BlockID blockID; private final long blockLength; - private final AtomicReference pipelineRef = - new AtomicReference<>(); - private final AtomicReference> tokenRef = - new AtomicReference<>(); + private final AtomicReference pipelineRef = new AtomicReference<>(); + private final AtomicReference> tokenRef = new AtomicReference<>(); private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; - private List bufferOffsets; - private int bufferIndex; - private long blockPosition = -1; - private List buffers; - // Checks if the StreamBlockInputStream has already read data from the container. - private boolean allocated = false; - private long bufferOffsetWrtBlockData; - private long buffersSize; - private static final int EOF = -1; - private final List validators; + private ByteBuffer buffer; + private long position = 0; + private boolean initialized = false; + private StreamingReader streamingReader; + private final boolean verifyChecksum; private final Function refreshFunction; private final RetryPolicy retryPolicy; - private int retries; + private int retries = 0; public StreamBlockInputStream( BlockID blockID, long length, Pipeline pipeline, @@ -100,17 +85,12 @@ public StreamBlockInputStream( OzoneClientConfig config) throws IOException { this.blockID = blockID; this.blockLength = length; - setPipeline(pipeline); + pipelineRef.set(setPipeline(pipeline)); tokenRef.set(token); this.xceiverClientFactory = xceiverClientFactory; - this.validators = ContainerProtocolCalls.toValidatorList( - (request, response) -> validateBlock(response)); this.verifyChecksum = config.isChecksumVerify(); + this.retryPolicy = getReadRetryPolicy(config); this.refreshFunction = refreshFunction; - this.retryPolicy = - HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), - TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); - } @Override @@ -125,170 +105,54 @@ public long getLength() { @Override public synchronized long getPos() { - if (blockLength == 0) { - return 0; - } - if (blockPosition >= 0) { - return blockPosition; - } - - if (buffersHaveData()) { - // BufferOffset w.r.t to BlockData + BufferOffset w.r.t buffers + - // Position of current Buffer - return bufferOffsetWrtBlockData + bufferOffsets.get(bufferIndex) + - buffers.get(bufferIndex).position(); - } - if (allocated && !dataRemainingInBlock()) { - Preconditions.checkState( - bufferOffsetWrtBlockData + buffersSize == blockLength, - "EOF detected but not at the last byte of the chunk"); - return blockLength; - } - if (buffersAllocated()) { - return bufferOffsetWrtBlockData + buffersSize; - } - return 0; + return position; } @Override public synchronized int read() throws IOException { - int dataout = EOF; - int len = 1; - int available; - while (len > 0) { - try { - acquireClient(); - available = prepareRead(1); - retries = 0; - } catch (SCMSecurityException ex) { - throw ex; - } catch (StorageContainerException e) { - handleStorageContainerException(e); - continue; - } catch (IOException ioe) { - handleIOException(ioe); - continue; - } - if (available == EOF) { - // There is no more data in the chunk stream. The buffers should have - // been released by now - Preconditions.checkState(buffers == null); - } else { - dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get()); - } - - len -= available; - if (bufferEOF()) { - releaseBuffers(bufferIndex); - } + checkOpen(); + if (!dataAvailableToRead()) { + return EOF; } - - - return dataout; - - + position++; + return buffer.get(); } @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for performance - // reasons. In addition to performance, we need to do it for correctness - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thread and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStream - // have a subtle dependency (bug?) on the wrapped stream providing separate - // implementations of single-byte read and bulk read. Without this, get key - // responses might close the connection before writing all of the bytes - // advertised in the Content-Length. - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - int total = 0; - int available; - while (len > 0) { - try { - acquireClient(); - available = prepareRead(len); - retries = 0; - } catch (SCMSecurityException ex) { - throw ex; - } catch (StorageContainerException e) { - handleStorageContainerException(e); - continue; - } catch (IOException ioe) { - handleIOException(ioe); - continue; - } - if (available == EOF) { - // There is no more data in the block stream. The buffers should have - // been released by now - Preconditions.checkState(buffers == null); - return total != 0 ? total : EOF; - } - buffers.get(bufferIndex).get(b, off + total, available); - len -= available; - total += available; + ByteBuffer tmpBuffer = ByteBuffer.wrap(b, off, len); + return read(tmpBuffer); + } - if (bufferEOF()) { - releaseBuffers(bufferIndex); + @Override + public synchronized int read(ByteBuffer targetBuf) throws IOException { + checkOpen(); + int read = 0; + while (targetBuf.hasRemaining()) { + if (!dataAvailableToRead()) { + break; } + int toCopy = Math.min(buffer.remaining(), targetBuf.remaining()); + ByteBuffer tmpBuf = buffer.duplicate(); + tmpBuf.limit(tmpBuf.position() + toCopy); + targetBuf.put(tmpBuf); + buffer.position(tmpBuf.position()); + position += toCopy; + read += toCopy; } - return total; - + return read > 0 ? read : EOF; } - @Override - public synchronized int read(ByteBuffer byteBuffer) throws IOException { - if (byteBuffer == null) { - throw new NullPointerException(); - } - int len = byteBuffer.remaining(); - if (len == 0) { - return 0; + private boolean dataAvailableToRead() throws IOException { + if (position >= blockLength) { + return false; } - int total = 0; - int available; - while (len > 0) { - try { - acquireClient(); - available = prepareRead(len); - retries = 0; - } catch (SCMSecurityException ex) { - throw ex; - } catch (StorageContainerException e) { - handleStorageContainerException(e); - continue; - } catch (IOException ioe) { - handleIOException(ioe); - continue; - } - if (available == EOF) { - // There is no more data in the block stream. The buffers should have - // been released by now - Preconditions.checkState(buffers == null); - return total != 0 ? total : EOF; - } - ByteBuffer readBuf = buffers.get(bufferIndex); - ByteBuffer tmpBuf = readBuf.duplicate(); - tmpBuf.limit(tmpBuf.position() + available); - byteBuffer.put(tmpBuf); - readBuf.position(tmpBuf.position()); - - len -= available; - total += available; - - if (bufferEOF()) { - releaseBuffers(bufferIndex); - } + initialize(); + if (buffer == null || buffer.remaining() == 0) { + int loaded = fillBuffer(); + return loaded != EOF; } - return total; + return true; } @Override @@ -298,62 +162,44 @@ protected int readWithStrategy(ByteReaderStrategy strategy) throws IOException { @Override public synchronized void seek(long pos) throws IOException { - if (pos == 0 && blockLength == 0) { - // It is possible for length and pos to be zero in which case - // seek should return instead of throwing exception - return; + checkOpen(); + if (pos < 0) { + throw new IOException("Cannot seek to negative offset"); } - if (pos < 0 || pos > blockLength) { - throw new EOFException("EOF encountered at pos: " + pos + " for block: " + blockID); + if (pos > blockLength) { + throw new IOException("Cannot seek after the end of the block"); } - - if (buffersHavePosition(pos)) { - // The bufferPosition is w.r.t the current block. - // Adjust the bufferIndex and position to the seeked position. - adjustBufferPosition(pos - bufferOffsetWrtBlockData); - } else { - blockPosition = pos; + if (pos == position) { + return; } + closeStream(); + position = pos; } @Override + // The seekable interface indicates that seekToNewSource should seek to a new source of the data, + // ie a different datanode. This is not supported for now. public synchronized boolean seekToNewSource(long l) throws IOException { return false; } @Override public synchronized void unbuffer() { - blockPosition = getPos(); releaseClient(); - releaseBuffers(); } - private void setPipeline(Pipeline pipeline) throws IOException { - if (pipeline == null) { - return; - } - long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count(); - - if (replicaIndexes > 1) { - throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.", - pipeline)); + private void closeStream() { + if (streamingReader != null) { + streamingReader.cancel(); + streamingReader = null; } - - // irrespective of the container state, we will always read via Standalone - // protocol. - boolean okForRead = - pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE - || pipeline.getType() == HddsProtos.ReplicationType.EC; - Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead().toBuilder() - .setReplicationConfig(StandaloneReplicationConfig.getInstance( - getLegacyFactor(pipeline.getReplicationConfig()))) - .build(); - pipelineRef.set(readPipeline); + initialized = false; + buffer = null; } protected synchronized void checkOpen() throws IOException { if (xceiverClientFactory == null) { - throw new IOException("StreamBlockInputStream has been closed."); + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Block: " + blockID); } } @@ -364,384 +210,234 @@ protected synchronized void acquireClient() throws IOException { try { xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline); } catch (IOException ioe) { - LOG.warn("Failed to acquire client for pipeline {}, block {}", - pipeline, blockID); + LOG.warn("Failed to acquire client for pipeline {}, block {}", pipeline, blockID); throw ioe; } } } - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (blockPosition >= 0) { - if (buffersHavePosition(blockPosition)) { - // The current buffers have the seeked position. Adjust the buffer - // index and position to point to the buffer position. - adjustBufferPosition(blockPosition - bufferOffsetWrtBlockData); - } else { - // Read a required block data to fill the buffers with seeked - // position data - readDataFromContainer(len); - } + private void initialize() throws IOException { + if (initialized) { + return; + } + while (true) { + try { + acquireClient(); + streamingReader = new StreamingReader(); + ContainerProtocolCalls.readBlock(xceiverClient, position, blockID, tokenRef.get(), + pipelineRef.get().getReplicaIndexes(), streamingReader); + initialized = true; + return; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + handleExceptions(new IOException("Interrupted", ie)); + } catch (IOException ioe) { + handleExceptions(ioe); } - if (buffersHaveData()) { - // Data is available from buffers - ByteBuffer bb = buffers.get(bufferIndex); - return Math.min(len, bb.remaining()); - } else if (dataRemainingInBlock()) { - // There is more data in the block stream which has not - // been read into the buffers yet. - readDataFromContainer(len); + } + } + + private void handleExceptions(IOException cause) throws IOException { + if (cause instanceof StorageContainerException || isConnectivityIssue(cause)) { + if (shouldRetryRead(cause, retryPolicy, retries++)) { + releaseClient(); + refreshBlockInfo(cause); + LOG.warn("Refreshing block data to read block {} due to {}", blockID, cause.getMessage()); } else { - // All available input from this block stream has been consumed. - return EOF; + throw cause; } + } else { + throw cause; } + } - + private int fillBuffer() throws IOException { + if (!streamingReader.hasNext()) { + return EOF; + } + buffer = streamingReader.readNext(); + return buffer == null ? EOF : buffer.limit(); } - private boolean buffersHavePosition(long pos) { - // Check if buffers have been allocated - if (buffersAllocated()) { - // Check if the current buffers cover the input position - // Released buffers should not be considered when checking if position - // is available - return pos >= bufferOffsetWrtBlockData + - bufferOffsets.get(0) && - pos < bufferOffsetWrtBlockData + buffersSize; + protected synchronized void releaseClient() { + if (xceiverClientFactory != null && xceiverClient != null) { + closeStream(); + xceiverClientFactory.releaseClientForReadData(xceiverClient, false); + xceiverClient = null; } - return false; } - /** - * Check if the buffers have been allocated data and false otherwise. - */ - @VisibleForTesting - protected boolean buffersAllocated() { - return buffers != null && !buffers.isEmpty(); + @Override + public synchronized void close() throws IOException { + releaseClient(); + xceiverClientFactory = null; } - /** - * Adjust the buffers position to account for seeked position and/ or checksum - * boundary reads. - * @param bufferPosition the position to which the buffers must be advanced - */ - private void adjustBufferPosition(long bufferPosition) { - // The bufferPosition is w.r.t the current buffers. - // Adjust the bufferIndex and position to the seeked bufferPosition. - bufferIndex = Collections.binarySearch(bufferOffsets, bufferPosition); - // bufferIndex is negative if bufferPosition isn't found in bufferOffsets - // count (bufferIndex = -bufferIndex - 2) to get bufferPosition is between which offsets. - if (bufferIndex < 0) { - bufferIndex = -bufferIndex - 2; - } + private void refreshBlockInfo(IOException cause) throws IOException { + refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction); + } - buffers.get(bufferIndex).position( - (int) (bufferPosition - bufferOffsets.get(bufferIndex))); - - // Reset buffers > bufferIndex to position 0. We do this to reset any - // previous reads/ seeks which might have updated any buffer position. - // For buffers < bufferIndex, we do not need to reset the position as it - // not required for this read. If a seek was done to a position in the - // previous indices, the buffer position reset would be performed in the - // seek call. - for (int i = bufferIndex + 1; i < buffers.size(); i++) { - buffers.get(i).position(0); + private synchronized void releaseStreamResources(StreamingReadResponse response) { + if (xceiverClient != null) { + xceiverClient.completeStreamRead(response); } - - // Reset the blockPosition as chunk stream has been initialized i.e. the - // buffers have been allocated. - blockPosition = -1; } /** - * Reads full or partial Chunk from DN Container based on the current - * position of the ChunkInputStream, the number of bytes of data to read - * and the checksum boundaries. - * If successful, then the read data in saved in the buffers so that - * subsequent read calls can utilize it. - * @param len number of bytes of data to be read - * @throws IOException if there is an I/O error while performing the call - * to Datanode + * Implementation of a StreamObserver used to received and buffer streaming GRPC reads. */ - private synchronized void readDataFromContainer(int len) throws IOException { - // index of first byte to be read from the block - long startByteIndex; - if (blockPosition >= 0) { - // If seek operation was called to advance the buffer position, the - // chunk should be read from that position onwards. - startByteIndex = blockPosition; - } else { - // Start reading the block from the last blockPosition onwards. - startByteIndex = bufferOffsetWrtBlockData + buffersSize; - } + public class StreamingReader implements StreamingReaderSpi { - // bufferOffsetWrtChunkData and buffersSize are updated after the data - // is read from Container and put into the buffers, but if read fails - // and is retried, we need the previous position. Position is reset after - // successful read in adjustBufferPosition() - blockPosition = getPos(); - bufferOffsetWrtBlockData = readData(startByteIndex, len); - long tempOffset = 0L; - buffersSize = 0L; - bufferOffsets = new ArrayList<>(buffers.size()); - for (ByteBuffer buffer : buffers) { - bufferOffsets.add(tempOffset); - tempOffset += buffer.limit(); - buffersSize += buffer.limit(); + private final BlockingQueue responseQueue = new LinkedBlockingQueue<>(1); + private final AtomicBoolean completed = new AtomicBoolean(false); + private final AtomicBoolean failed = new AtomicBoolean(false); + private final AtomicBoolean semaphoreReleased = new AtomicBoolean(false); + private final AtomicReference error = new AtomicReference<>(); + private volatile StreamingReadResponse response; + public boolean hasNext() { + return !responseQueue.isEmpty() || !completed.get(); } - bufferIndex = 0; - allocated = true; - adjustBufferPosition(startByteIndex - bufferOffsetWrtBlockData); - } + public ByteBuffer readNext() throws IOException { + if (failed.get()) { + Throwable cause = error.get(); + throw new IOException("Streaming read failed", cause); + } - @VisibleForTesting - protected long readData(long startByteIndex, long len) - throws IOException { - Pipeline pipeline = pipelineRef.get(); - buffers = new ArrayList<>(); - ReadBlockResponseProto response = - ContainerProtocolCalls.readBlock(xceiverClient, startByteIndex, - len, blockID, validators, tokenRef.get(), pipeline.getReplicaIndexes(), verifyChecksum); - List readBlocks = response.getReadChunkList(); - - for (ReadChunkResponseProto readBlock : readBlocks) { - if (readBlock.hasDataBuffers()) { - buffers.addAll(BufferUtils.getReadOnlyByteBuffers( - readBlock.getDataBuffers().getBuffersList())); - } else { - throw new IOException("Unexpected error while reading chunk data " + - "from container. No data returned."); + if (completed.get() && responseQueue.isEmpty()) { + return null; // Stream ended } - } - return response.getReadChunk(0) - .getChunkData().getOffset(); - } - /** - * Check if the buffers have any data remaining between the current - * position and the limit. - */ - private boolean buffersHaveData() { - boolean hasData = false; - if (buffersAllocated()) { - int buffersLen = buffers.size(); - while (bufferIndex < buffersLen) { - ByteBuffer buffer = buffers.get(bufferIndex); - if (buffer != null && buffer.hasRemaining()) { - // current buffer has data - hasData = true; - break; + ReadBlockResponseProto readBlock; + try { + readBlock = responseQueue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for response", e); + } + if (readBlock == null) { + if (failed.get()) { + Throwable cause = error.get(); + throw new IOException("Streaming read failed", cause); + } else if (completed.get()) { + return null; // Stream ended } else { - if (bufferIndex < buffersLen - 1) { - // move to next available buffer - ++bufferIndex; - Preconditions.checkState(bufferIndex < buffers.size()); - } else { - // no more buffers remaining - break; - } + throw new IOException("Timed out waiting for response"); } } + // The server always returns data starting from the last checksum boundary. Therefore if the reader position is + // ahead of the position we received from the server, we need to adjust the buffer position accordingly. + // If the reader position is behind + ByteBuffer buf = readBlock.getData().asReadOnlyByteBuffer(); + long blockOffset = readBlock.getOffset(); + long pos = getPos(); + if (pos < blockOffset) { + // This should not happen, and if it does, we have a bug. + throw new IOException("Received data out of order. Position is " + pos + " but received data at " + + blockOffset); + } + if (pos > readBlock.getOffset()) { + int offset = (int)(pos - readBlock.getOffset()); + buf.position(offset); + } + return buf; } - return hasData; - } - - /** - * Check if there is more data in the chunk which has not yet been read - * into the buffers. - */ - private boolean dataRemainingInBlock() { - long bufferPos; - if (blockPosition >= 0) { - bufferPos = blockPosition; - } else { - bufferPos = bufferOffsetWrtBlockData + buffersSize; + private void releaseResources() { + boolean wasNotYetComplete = semaphoreReleased.getAndSet(true); + if (wasNotYetComplete) { + releaseStreamResources(response); + } } - return bufferPos < blockLength; - } - - /** - * Check if current buffer had been read till the end. - */ - private boolean bufferEOF() { - return allocated && buffersAllocated() && !buffers.get(bufferIndex).hasRemaining(); - } - - /** - * Release the buffers upto the given index. - * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the - * buffers must be released - */ - private void releaseBuffers(int releaseUptoBufferIndex) { - int buffersLen = buffers.size(); - if (releaseUptoBufferIndex == buffersLen - 1) { - // Before releasing all the buffers, if block EOF is not reached, then - // blockPosition should be set to point to the last position of the - // buffers. This should be done so that getPos() can return the current - // block position - blockPosition = bufferOffsetWrtBlockData + - bufferOffsets.get(releaseUptoBufferIndex) + - buffers.get(releaseUptoBufferIndex).capacity(); - // Release all the buffers - releaseBuffers(); - } else { - buffers = buffers.subList(releaseUptoBufferIndex + 1, buffersLen); - bufferOffsets = bufferOffsets.subList( - releaseUptoBufferIndex + 1, buffersLen); - bufferIndex = 0; + @Override + public void onNext(ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto) { + try { + ReadBlockResponseProto readBlock = containerCommandResponseProto.getReadBlock(); + ByteBuffer data = readBlock.getData().asReadOnlyByteBuffer(); + if (verifyChecksum) { + ChecksumData checksumData = ChecksumData.getFromProtoBuf(readBlock.getChecksumData()); + Checksum.verifyChecksum(data, checksumData, 0); + } + offerToQueue(readBlock); + } catch (OzoneChecksumException e) { + LOG.warn("Checksum verification failed for block {} from datanode {}", + getBlockID(), response.getDatanodeDetails(), e); + cancelDueToError(e); + } } - } - - /** - * If EOF is reached, release the buffers. - */ - private void releaseBuffers() { - buffers = null; - bufferIndex = 0; - // We should not reset bufferOffsetWrtBlockData and buffersSize here - // because when getPos() is called we use these - // values and determine whether chunk is read completely or not. - } - protected synchronized void releaseClient() { - if (xceiverClientFactory != null && xceiverClient != null) { - xceiverClientFactory.releaseClientForReadData(xceiverClient, false); - xceiverClient = null; + @Override + public void onError(Throwable throwable) { + if (throwable instanceof StatusRuntimeException) { + if (((StatusRuntimeException) throwable).getStatus().getCode().name().equals("CANCELLED")) { + // This is expected when the client cancels the stream. + setCompleted(); + } + } else { + setFailed(throwable); + } + releaseResources(); } - } - - private void validateBlock( - ContainerProtos.ContainerCommandResponseProto response - ) throws IOException { - ReadBlockResponseProto readBlock = response.getReadBlock(); - for (ReadChunkResponseProto readChunk : readBlock.getReadChunkList()) { - List byteStrings; + @Override + public void onCompleted() { + setCompleted(); + releaseResources(); + } - ContainerProtos.ChunkInfo chunkInfo = - readChunk.getChunkData(); - if (chunkInfo.getLen() <= 0) { - throw new IOException("Failed to get chunk: chunkName == " - + chunkInfo.getChunkName() + "len == " + chunkInfo.getLen()); - } - byteStrings = readChunk.getDataBuffers().getBuffersList(); - long buffersLen = BufferUtils.getBuffersLen(byteStrings); - if (buffersLen != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new OzoneChecksumException(String.format( - "Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), - buffersLen)); + /** + * By calling cancel, the client will send a cancel signal to the server, which will stop sending more data and + * cause the onError() to be called in this observer with a CANCELLED exception. + */ + public void cancel() { + if (response != null && response.getRequestObserver() != null) { + response.getRequestObserver().cancel("Cancelled by client", CANCELLED_EXCEPTION); + setCompleted(); + releaseResources(); } + } - - if (verifyChecksum) { - ChecksumData checksumData = ChecksumData.getFromProtoBuf( - chunkInfo.getChecksumData()); - int startIndex = (int) readChunk.getChunkData().getOffset() / checksumData.getBytesPerChecksum(); - - // ChecksumData stores checksum for each 'numBytesPerChecksum' - // number of bytes in a list. Compute the index of the first - // checksum to match with the read data - - Checksum.verifyChecksum(byteStrings, checksumData, startIndex); + public void cancelDueToError(Throwable exception) { + if (response != null && response.getRequestObserver() != null) { + response.getRequestObserver().onError(exception); + setFailed(exception); + releaseResources(); } } - } - - @VisibleForTesting - protected synchronized void setBuffers(List buffers) { - this.buffers = buffers; - } - private boolean shouldRetryRead(IOException cause) throws IOException { - RetryPolicy.RetryAction retryAction; - try { - retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(e); + private void setFailed(Throwable throwable) { + if (completed.get()) { + throw new IllegalArgumentException("Cannot mark a completed stream as failed"); + } + failed.set(true); + error.set(throwable); } - return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY; - } - @VisibleForTesting - public boolean isVerifyChecksum() { - return verifyChecksum; - } + private void setCompleted() { + if (!failed.get()) { + completed.set(true); + } + } - private void refreshBlockInfo(IOException cause) throws IOException { - LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}", - blockID, pipelineRef.get().getId(), cause.getMessage()); - if (refreshFunction != null) { - LOG.debug("Re-fetching pipeline and block token for block {}", blockID); - BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID); - if (blockLocationInfo == null) { - LOG.warn("No new block location info for block {}", blockID); - } else { - setPipeline(blockLocationInfo.getPipeline()); - LOG.info("New pipeline for block {}: {}", blockID, - blockLocationInfo.getPipeline()); - - tokenRef.set(blockLocationInfo.getToken()); - if (blockLocationInfo.getToken() != null) { - OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); - tokenId.readFromByteArray(tokenRef.get().getIdentifier()); - LOG.info("A new token is added for block {}. Expiry: {}", - blockID, Instant.ofEpochMilli(tokenId.getExpiryDate())); + private void offerToQueue(ReadBlockResponseProto item) { + while (!completed.get() && !failed.get()) { + try { + if (responseQueue.offer(item, 100, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } } - } else { - throw cause; } - } - - @VisibleForTesting - public synchronized ByteBuffer[] getCachedBuffers() { - return buffers == null ? null : - BufferUtils.getReadOnlyByteBuffers(buffers.toArray(new ByteBuffer[0])); - } - /** - * Check if this exception is because datanodes are not reachable. - */ - private boolean isConnectivityIssue(IOException ex) { - return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); - } - - @Override - public synchronized void close() throws IOException { - releaseClient(); - releaseBuffers(); - xceiverClientFactory = null; - } - - private void handleStorageContainerException(StorageContainerException e) throws IOException { - if (shouldRetryRead(e)) { - releaseClient(); - refreshBlockInfo(e); - } else { - throw e; + @Override + public void setStreamingReadResponse(StreamingReadResponse streamingReadResponse) { + response = streamingReadResponse; } } - private void handleIOException(IOException ioe) throws IOException { - if (shouldRetryRead(ioe)) { - if (isConnectivityIssue(ioe)) { - releaseClient(); - refreshBlockInfo(ioe); - } else { - releaseClient(); - } - } else { - throw ioe; - } - } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index 9249abaf42a6..2765e2a00a8f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -90,9 +90,8 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, blockInfo, xceiverFactory, refreshFunction, ecBlockStreamFactory, config); } else if (config.isStreamReadBlock() && allDataNodesSupportStreamBlock(pipeline)) { - return new StreamBlockInputStream( - blockInfo.getBlockID(), blockInfo.getLength(), - pipeline, token, xceiverFactory, refreshFunction, config); + return new StreamBlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), pipeline, token, xceiverFactory, + refreshFunction, config); } else { return new BlockInputStream(blockInfo, pipeline, token, xceiverFactory, refreshFunction, @@ -104,8 +103,7 @@ private boolean allDataNodesSupportStreamBlock(Pipeline pipeline) { // return true only if all DataNodes in the pipeline are on a version // that supports for reading a block by streaming chunks.. for (DatanodeDetails dn : pipeline.getNodes()) { - if (dn.getCurrentVersion() < - STREAM_BLOCK_SUPPORT.toProtoValue()) { + if (dn.getCurrentVersion() < STREAM_BLOCK_SUPPORT.toProtoValue()) { return false; } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java deleted file mode 100644 index e141845954d3..000000000000 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.storage; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.ozone.common.ChecksumData; -import org.apache.hadoop.ozone.common.utils.BufferUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; - -/** - * A dummy StreamBlockInputStream to mock read block call to DN. - */ -class DummyStreamBlockInputStream extends StreamBlockInputStream { - - private final List readByteBuffers = new ArrayList<>(); - private final List chunks; - private final long[] chunkOffsets; - private final Map chunkDataMap; - - @SuppressWarnings("parameternumber") - DummyStreamBlockInputStream( - BlockID blockId, - long blockLen, - Pipeline pipeline, - Token token, - XceiverClientFactory xceiverClientManager, - Function refreshFunction, - OzoneClientConfig config, - List chunks, - Map chunkDataMap) throws IOException { - super(blockId, blockLen, pipeline, token, xceiverClientManager, - refreshFunction, config); - this.chunks = chunks; - this.chunkDataMap = chunkDataMap; - chunkOffsets = new long[chunks.size()]; - long temp = 0; - for (int i = 0; i < chunks.size(); i++) { - chunkOffsets[i] = temp; - temp += chunks.get(i).getLen(); - } - } - - @Override - protected synchronized void checkOpen() throws IOException { - // No action needed - } - - @Override - protected void acquireClient() { - // No action needed - } - - @Override - protected void releaseClient() { - // no-op - } - - @Override - protected long readData(long offset, long len) { - int chunkIndex = Arrays.binarySearch(chunkOffsets, offset); - if (chunkIndex < 0) { - chunkIndex = -chunkIndex - 2; - } - ChunkInfo chunkInfo = chunks.get(chunkIndex); - readByteBuffers.clear(); - long chunkOffset = offset - chunkInfo.getOffset(); - if (isVerifyChecksum()) { - ChecksumData checksumData = ChecksumData.getFromProtoBuf( - chunkInfo.getChecksumData()); - int bytesPerChecksum = checksumData.getBytesPerChecksum(); - chunkOffset = (chunkOffset / bytesPerChecksum) * bytesPerChecksum; - } - long bufferOffsetWrtBlockDataData = chunkOffsets[chunkIndex] + chunkOffset; - while (len > 0) { - ChunkInfo currentChunk = chunks.get(chunkIndex); - int bufferCapacity = currentChunk.getChecksumData().getBytesPerChecksum(); - long chunkLen = currentChunk.getLen(); - long remainingToRead = Math.min(chunkLen, len); - if (isVerifyChecksum()) { - if (len < chunkLen) { - final ChecksumData checksumData = ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - final long endByteIndex = len - 1; - final int bytesPerChecksum = checksumData.getBytesPerChecksum(); - remainingToRead = (endByteIndex / bytesPerChecksum + 1) * bytesPerChecksum; - } else { - remainingToRead = chunkLen; - } - } - - long bufferLen; - while (remainingToRead > 0) { - if (remainingToRead < bufferCapacity) { - bufferLen = remainingToRead; - } else { - bufferLen = bufferCapacity; - } - ByteString byteString = ByteString.copyFrom(chunkDataMap.get(chunks.get(chunkIndex).getChunkName()), - (int) chunkOffset, (int) bufferLen); - - readByteBuffers.add(byteString); - - chunkOffset += bufferLen; - remainingToRead -= bufferLen; - len -= bufferLen; - } - chunkOffset = 0; - chunkIndex++; - } - setBuffers(BufferUtils.getReadOnlyByteBuffers(readByteBuffers)); - return bufferOffsetWrtBlockDataData; - } - - public List getReadByteBuffers() { - return readByteBuffers; - } -} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 2e9a84cad429..1b5e7667e845 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -267,8 +267,8 @@ public void testSeekAndRead() throws Exception { @Test public void testRefreshPipelineFunction() throws Exception { - LogCapturer logCapturer = LogCapturer.captureLogs(BlockInputStream.class); - GenericTestUtils.setLogLevel(BlockInputStream.class, Level.DEBUG); + LogCapturer logCapturer = LogCapturer.captureLogs(BlockExtendedInputStream.class); + GenericTestUtils.setLogLevel(BlockExtendedInputStream.class, Level.DEBUG); BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); AtomicBoolean isRefreshed = new AtomicBoolean(); createChunkList(5); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java index 81cb6d4d62c6..b2cb3fb865cb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -17,248 +17,338 @@ package org.apache.hadoop.hdds.scm.storage; -import static org.assertj.core.api.Assertions.assertThat; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; - -import com.google.common.primitives.Bytes; -import java.io.EOFException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.stream.Stream; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamingReadResponse; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Time; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusException; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.invocation.InvocationOnMock; /** * Tests for {@link TestStreamBlockInputStream}'s functionality. */ public class TestStreamBlockInputStream { - private int blockSize; - private static final int CHUNK_SIZE = 100; - private static final int BYTES_PER_CHECKSUM = 20; - private static final Random RANDOM = new Random(); - private DummyStreamBlockInputStream blockStream; - private byte[] blockData; - private List chunks; - private Map chunkDataMap; + private static final int BYTES_PER_CHECKSUM = 1024; + private static final int BLOCK_SIZE = 1024; + private StreamBlockInputStream blockStream; + private final OzoneConfiguration conf = new OzoneConfiguration(); + private XceiverClientFactory xceiverClientFactory; + private XceiverClientGrpc xceiverClient; private Checksum checksum; - private BlockID blockID; - private static final String CHUNK_NAME = "chunk-"; - private OzoneConfiguration conf = new OzoneConfiguration(); + private ChecksumData checksumData; + private byte[] data; + private ClientCallStreamObserver requestObserver; + private Function refreshFunction; @BeforeEach public void setup() throws Exception { + Token token = mock(Token.class); + when(token.encodeToUrlString()).thenReturn("url"); + + Set modes = + Collections.singleton(HddsProtos.BlockTokenSecretProto.AccessModeProto.READ); + OzoneBlockTokenIdentifier tokenIdentifier = new OzoneBlockTokenIdentifier("owner", new BlockID(1, 1), + modes, Time.monotonicNow() + 10000, 10); + tokenIdentifier.setSecretKeyId(UUID.randomUUID()); + when(token.getIdentifier()).thenReturn(tokenIdentifier.getBytes()); + Pipeline pipeline = MockPipeline.createSingleNodePipeline(); + + BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class); + when(blockLocationInfo.getPipeline()).thenReturn(pipeline); + when(blockLocationInfo.getToken()).thenReturn(token); + + xceiverClient = mock(XceiverClientGrpc.class); + when(xceiverClient.getPipeline()).thenReturn(pipeline); + xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any())) + .thenReturn(xceiverClient); + requestObserver = mock(ClientCallStreamObserver.class); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setStreamReadBlock(true); - blockID = new BlockID(new ContainerBlockID(1, 1)); + clientConfig.setMaxReadRetryCount(1); + refreshFunction = mock(Function.class); + when(refreshFunction.apply(any())).thenReturn(blockLocationInfo); + BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM); - createChunkList(5); - - Pipeline pipeline = MockPipeline.createSingleNodePipeline(); - blockStream = new DummyStreamBlockInputStream(blockID, blockSize, pipeline, - null, null, mock(Function.class), clientConfig, chunks, chunkDataMap); + createDataAndChecksum(); + blockStream = new StreamBlockInputStream(blockID, BLOCK_SIZE, pipeline, + token, xceiverClientFactory, refreshFunction, clientConfig); } - /** - * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE - * and the last chunk with length CHUNK_SIZE/2. - */ - private void createChunkList(int numChunks) - throws Exception { - - chunks = new ArrayList<>(numChunks); - chunkDataMap = new HashMap<>(); - blockData = new byte[0]; - int i, chunkLen; - byte[] byteData; - String chunkName; - - for (i = 0; i < numChunks; i++) { - chunkName = CHUNK_NAME + i; - chunkLen = CHUNK_SIZE; - if (i == numChunks - 1) { - chunkLen = CHUNK_SIZE / 2; + @AfterEach + public void teardown() { + if (blockStream != null) { + try { + blockStream.close(); + } catch (IOException e) { + // ignore } - byteData = generateRandomData(chunkLen); - ChunkInfo chunkInfo = ChunkInfo.newBuilder() - .setChunkName(chunkName) - .setOffset(0) - .setLen(chunkLen) - .setChecksumData(checksum.computeChecksum( - byteData, 0, chunkLen).getProtoBufMessage()) - .build(); - - chunkDataMap.put(chunkName, byteData); - chunks.add(chunkInfo); - - blockSize += chunkLen; - blockData = Bytes.concat(blockData, byteData); } } - static byte[] generateRandomData(int length) { - byte[] bytes = new byte[length]; - RANDOM.nextBytes(bytes); - return bytes; + @Test + public void testCloseStreamReleasesResources() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertEquals(data[0], blockStream.read()); + blockStream.close(); + // Verify that cancel() was called on the requestObserver mock + verify(requestObserver).cancel(any(), any()); + // Verify that release() was called on the xceiverClient mock + verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, false); + verify(xceiverClient, times(1)).completeStreamRead(any()); } - /** - * Match readData with the chunkData byte-wise. - * @param readData Data read through ChunkInputStream - * @param inputDataStartIndex first index (inclusive) in chunkData to compare - * with read data - * @param length the number of bytes of data to match starting from - * inputDataStartIndex - */ - private void matchWithInputData(byte[] readData, int inputDataStartIndex, - int length) { - for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) { - assertEquals(blockData[i], readData[i - inputDataStartIndex], "i: " + i); - } + @Test + public void testUnbufferReleasesResourcesAndResumesFromLastPosition() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertEquals(data[0], blockStream.read()); + assertEquals(1, blockStream.getPos()); + blockStream.unbuffer(); + // Verify that cancel() was called on the requestObserver mock + verify(requestObserver).cancel(any(), any()); + // Verify that release() was called on the xceiverClient mock + verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, false); + verify(xceiverClient, times(1)).completeStreamRead(any()); + // The next read should "rebuffer" and continue from the last position + assertEquals(data[1], blockStream.read()); + assertEquals(2, blockStream.getPos()); } - private void matchWithInputData(List byteStrings, - int inputDataStartIndex, int length) { - int offset = inputDataStartIndex; - int totalBufferLen = 0; - for (ByteString byteString : byteStrings) { - int bufferLen = byteString.size(); - matchWithInputData(byteString.toByteArray(), offset, bufferLen); - offset += bufferLen; - totalBufferLen += bufferLen; - } - assertEquals(length, totalBufferLen); + @Test + public void testSeekReleasesTheStreamAndStartsFromNewPosition() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertEquals(data[0], blockStream.read()); + blockStream.seek(100); + assertEquals(100, blockStream.getPos()); + // Verify that cancel() was called on the requestObserver mock + verify(requestObserver).cancel(any(), any()); + verify(xceiverClient, times(1)).completeStreamRead(any()); + // The xceiverClient should not be released + verify(xceiverClientFactory, never()) + .releaseClientForReadData(xceiverClient, false); + + assertEquals(data[100], blockStream.read()); + assertEquals(101, blockStream.getPos()); } - /** - * Seek to a position and verify through getPos(). - */ - private void seekAndVerify(int pos) throws Exception { - blockStream.seek(pos); - assertEquals(pos, blockStream.getPos(), - "Current position of buffer does not match with the sought position"); + @Test + public void testErrorThrownIfStreamReturnsError() throws IOException, InterruptedException { + // Note the error will only be thrown when the buffer needs to be refilled. I think case, as its the first + // read it will try to fill the buffer and encounter the error, but a reader could continue reading until the + // buffer is exhausted before seeing the error. + doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + StreamingReadResponse resp = + new StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + streamObserver.setStreamingReadResponse(resp); + streamObserver.onError(new IOException("Test induced error")); + return null; + }).when(xceiverClient).streamRead(any(), any()); + assertThrows(IOException.class, () -> blockStream.read()); + verify(xceiverClient, times(0)).completeStreamRead(any()); } @Test - public void testFullChunkRead() throws Exception { - byte[] b = new byte[blockSize]; - int numBytesRead = blockStream.read(b, 0, blockSize); - assertEquals(blockSize, numBytesRead); - matchWithInputData(b, 0, blockSize); + public void seekOutOfBounds() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertThrows(IOException.class, () -> blockStream.seek(-1)); + assertThrows(IOException.class, () -> blockStream.seek(BLOCK_SIZE + 1)); } @Test - public void testPartialChunkRead() throws Exception { - int len = blockSize / 2; - byte[] b = new byte[len]; - - int numBytesRead = blockStream.read(b, 0, len); - assertEquals(len, numBytesRead); - matchWithInputData(b, 0, len); - - // To read block data from index 0 to 225 (len = 225), we need to read - // chunk from offset 0 to 240 as the checksum boundary is at every 20 - // bytes. Verify that 60 bytes of chunk data are read and stored in the - // buffers. Since checksum boundary is at every 20 bytes, there should be - // 240/20 number of buffers. - matchWithInputData(blockStream.getReadByteBuffers(), 0, 240); + public void readPastEOFReturnsEOF() throws IOException, InterruptedException { + setupSuccessfulRead(); + blockStream.seek(BLOCK_SIZE); + // Ensure the stream is at EOF even after two attempts to read + assertEquals(-1, blockStream.read()); + assertEquals(-1, blockStream.read()); + assertEquals(BLOCK_SIZE, blockStream.getPos()); } @Test - public void testSeek() throws Exception { - seekAndVerify(0); - EOFException eofException = assertThrows(EOFException.class, () -> seekAndVerify(blockSize + 1)); - assertThat(eofException).hasMessage("EOF encountered at pos: " + (blockSize + 1) + " for block: " + blockID); - - // Seek before read should update the BlockInputStream#blockPosition - seekAndVerify(25); - - // Read from the sought position. - // Reading from index 25 to 54 should result in the BlockInputStream - // copying chunk data from index 20 to 59 into the buffers (checksum - // boundaries). - byte[] b = new byte[30]; - int numBytesRead = blockStream.read(b, 0, 30); - assertEquals(30, numBytesRead); - matchWithInputData(b, 25, 30); - matchWithInputData(blockStream.getReadByteBuffers(), 20, 40); - - // After read, the position of the blockStream is evaluated from the - // buffers and the chunkPosition should be reset to -1. - - // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as - // buffers are released after each checksum boundary is read. So the - // buffers should contain data from index 40 to 59. - // Seek to a position within the cached buffers. BlockPosition should - // still not be used to set the position. - seekAndVerify(45); - - // Seek to a position outside the current cached buffers. In this case, the - // chunkPosition should be updated to the seeked position. - seekAndVerify(75); - - // Read upto checksum boundary should result in all the buffers being - // released and hence chunkPosition updated with current position of chunk. - seekAndVerify(25); - b = new byte[15]; - numBytesRead = blockStream.read(b, 0, 15); - assertEquals(15, numBytesRead); - matchWithInputData(b, 25, 15); + public void ensureExceptionThrownForReadAfterClosed() throws IOException, InterruptedException { + setupSuccessfulRead(); + blockStream.close(); + ByteBuffer byteBuffer = ByteBuffer.allocate(10); + byte[] byteArray = new byte[10]; + assertThrows(IOException.class, () -> blockStream.read()); + assertThrows(IOException.class, () -> { + // Findbugs complains about ignored return value without this :( + int r = blockStream.read(byteArray, 0, 10); + }); + assertThrows(IOException.class, () -> blockStream.read(byteBuffer)); + assertThrows(IOException.class, () -> blockStream.seek(10)); + } + + @ParameterizedTest + @MethodSource("exceptionsTriggeringRefresh") + public void testRefreshFunctionCalledForAllDNsBadOnInitialize(IOException thrown) + throws IOException, InterruptedException { + // In this case, if the first attempt to connect to any of the DNs fails, it should retry by refreshing the pipeline + + doAnswer((InvocationOnMock invocation) -> { + throw thrown; + }).doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + StreamingReadResponse resp = + new StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + streamObserver.setStreamingReadResponse(resp); + streamObserver.onNext(createChunkResponse(false)); + streamObserver.onCompleted(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + blockStream.read(); + verify(refreshFunction, times(1)).apply(any()); + } + + @ParameterizedTest + @MethodSource("exceptionsNotTriggeringRefresh") + public void testRefreshNotCalledForAllDNsBadOnInitialize(IOException thrown) + throws IOException, InterruptedException { + // In this case, if the first attempt to connect to any of the DNs fails, it should retry by refreshing the pipeline + doAnswer((InvocationOnMock invocation) -> { + throw thrown; + }).when(xceiverClient).streamRead(any(), any()); + assertThrows(IOException.class, () -> blockStream.read()); + verify(refreshFunction, times(0)).apply(any()); } @Test - public void testSeekAndRead() throws Exception { - // Seek to a position and read data - seekAndVerify(50); - byte[] b1 = new byte[20]; - int numBytesRead = blockStream.read(b1, 0, 20); - assertEquals(20, numBytesRead); - matchWithInputData(b1, 50, 20); - - // Next read should start from the position of the last read + 1 i.e. 70 - byte[] b2 = new byte[20]; - numBytesRead = blockStream.read(b2, 0, 20); - assertEquals(20, numBytesRead); - matchWithInputData(b2, 70, 20); - - byte[] b3 = new byte[20]; - seekAndVerify(80); - numBytesRead = blockStream.read(b3, 0, 20); - assertEquals(20, numBytesRead); - matchWithInputData(b3, 80, 20); + public void testExceptionThrownAfterRetriesExhausted() throws IOException, InterruptedException { + // In this case, if the first attempt to connect to any of the DNs fails, it should retry by refreshing the pipeline + doAnswer((InvocationOnMock invocation) -> { + throw new StorageContainerException(CONTAINER_NOT_FOUND); + }).when(xceiverClient).streamRead(any(), any()); + + assertThrows(IOException.class, () -> blockStream.read()); + verify(refreshFunction, times(1)).apply(any()); } @Test - public void testUnbuffered() throws Exception { - byte[] b1 = new byte[20]; - int numBytesRead = blockStream.read(b1, 0, 20); - assertEquals(20, numBytesRead); - matchWithInputData(b1, 0, 20); + public void testInvalidChecksumThrowsException() throws IOException, InterruptedException { + doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + StreamingReadResponse resp = + new StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + streamObserver.setStreamingReadResponse(resp); + streamObserver.onNext(createChunkResponse(true)); + streamObserver.onCompleted(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + assertThrows(IOException.class, () -> blockStream.read()); + } - blockStream.unbuffer(); + private void createDataAndChecksum() throws OzoneChecksumException { + data = new byte[BLOCK_SIZE]; + new SecureRandom().nextBytes(data); + checksumData = checksum.computeChecksum(data); + } + + private void setupSuccessfulRead() throws IOException, InterruptedException { + doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + StreamingReadResponse resp = + new StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + streamObserver.setStreamingReadResponse(resp); + streamObserver.onNext(createChunkResponse(false)); + streamObserver.onCompleted(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + } + + private ContainerProtos.ContainerCommandResponseProto createChunkResponse(boolean invalidChecksum) { + ContainerProtos.ReadBlockResponseProto response = invalidChecksum ? + createInValidChecksumResponse() : createValidResponse(); - assertFalse(blockStream.buffersAllocated()); + return ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadBlock) + .setReadBlock(response) + .setResult(ContainerProtos.Result.SUCCESS) + .build(); + } + + private ContainerProtos.ReadBlockResponseProto createValidResponse() { + return ContainerProtos.ReadBlockResponseProto.newBuilder() + .setChecksumData(checksumData.getProtoBufMessage()) + .setData(ByteString.copyFrom(data)) + .setOffset(0) + .build(); + } + + private ContainerProtos.ReadBlockResponseProto createInValidChecksumResponse() { + byte[] invalidData = new byte[data.length]; + System.arraycopy(data, 0, invalidData, 0, data.length); + // Corrupt the data + invalidData[0] = (byte) (invalidData[0] + 1); + return ContainerProtos.ReadBlockResponseProto.newBuilder() + .setChecksumData(checksumData.getProtoBufMessage()) + .setData(ByteString.copyFrom(invalidData)) + .setOffset(0) + .build(); + } + + private static Stream exceptionsTriggeringRefresh() { + return Stream.of( + Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)), + Arguments.of(new IOException(new ExecutionException( + new StatusException(Status.UNAVAILABLE)))) + ); + } - // Next read should start from the position of the last read + 1 i.e. 20 - byte[] b2 = new byte[20]; - numBytesRead = blockStream.read(b2, 0, 20); - assertEquals(20, numBytesRead); - matchWithInputData(b2, 20, 20); + private static Stream exceptionsNotTriggeringRefresh() { + return Stream.of( + Arguments.of(new SCMSecurityException("Security problem")), + Arguments.of(new OzoneChecksumException("checksum missing")), + Arguments.of(new IOException("Some random exception.")) + ); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java new file mode 100644 index 000000000000..ea8694cd8b78 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; + +/** + * Streaming read response holding datanode details and + * request observer to send read requests. + */ +public class StreamingReadResponse { + + private final DatanodeDetails dn; + private final ClientCallStreamObserver requestObserver; + + public StreamingReadResponse(DatanodeDetails dn, + ClientCallStreamObserver requestObserver) { + this.dn = dn; + this.requestObserver = requestObserver; + } + + public DatanodeDetails getDatanodeDetails() { + return dn; + } + + public ClientCallStreamObserver getRequestObserver() { + return requestObserver; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java new file mode 100644 index 000000000000..0206253784cf --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; + +/** + * SPI for streaming reader to set the streaming read response. + */ +public interface StreamingReaderSpi extends StreamObserver { + + void setStreamingReadResponse(StreamingReadResponse streamingReadResponse); + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 35e65271bb1c..f1bf7a8ef855 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -144,6 +144,15 @@ public ContainerCommandResponseProto sendCommand( } } + public void streamRead(ContainerCommandRequestProto request, + StreamingReaderSpi streamObserver) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Stream read is not supported"); + } + + public void completeStreamRead(StreamingReadResponse streamingReadResponse) { + throw new UnsupportedOperationException("Stream read is not supported"); + } + public static IOException getIOExceptionForSendCommand( ContainerCommandRequestProto request, Exception e) { return new IOException("Failed to execute command " diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java index ededa8d070b6..3fc97e97ffaa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBufferToByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; @@ -335,19 +336,16 @@ public static ContainerCommandResponseProto getReadChunkResponse( } public static ContainerCommandResponseProto getReadBlockResponse( - ContainerCommandRequestProto request, DatanodeBlockID blockID, - ChunkInfo chunkInfo, ChunkBufferToByteString data, - Function byteBufferToByteString) { + ContainerCommandRequestProto request, ChecksumData checksumData, ByteBuffer data, long offset) { + + ContainerProtos.ReadBlockResponseProto response = ContainerProtos.ReadBlockResponseProto.newBuilder() + .setChecksumData(checksumData.getProtoBufMessage()) + .setData(ByteString.copyFrom(data)) + .setOffset(offset) + .build(); - ReadChunkResponseProto.Builder response; - response = ReadChunkResponseProto.newBuilder() - .setChunkData(chunkInfo) - .setDataBuffers(DataBuffers.newBuilder() - .addAllBuffers(data.toByteStringList(byteBufferToByteString)) - .build()) - .setBlockID(blockID); return getSuccessResponseBuilder(request) - .setReadChunk(response) + .setReadBlock(response) .build(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 1117d29c1aa3..c6e5d75b5caf 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -56,13 +56,13 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; @@ -911,23 +911,18 @@ public static List toValidatorList(Validator validator) { * * @param xceiverClient client to perform call * @param offset offset where block starts - * @param len length of data to read * @param blockID ID of the block - * @param validators functions to validate the response * @param token a token for this block (may be null) - * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ @SuppressWarnings("checkstyle:ParameterNumber") - public static ContainerProtos.ReadBlockResponseProto readBlock( - XceiverClientSpi xceiverClient, long offset, long len, BlockID blockID, - List validators, Token token, - Map replicaIndexes, boolean verifyChecksum) throws IOException { + public static void readBlock( + XceiverClientSpi xceiverClient, long offset, BlockID blockID, Token token, + Map replicaIndexes, StreamingReaderSpi streamObserver) + throws IOException, InterruptedException { final ReadBlockRequestProto.Builder readBlockRequest = ReadBlockRequestProto.newBuilder() - .setOffset(offset) - .setVerifyChecksum(verifyChecksum) - .setLen(len); + .setOffset(offset); final ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock) .setContainerID(blockID.getContainerID()); @@ -935,18 +930,14 @@ public static ContainerProtos.ReadBlockResponseProto readBlock( builder.setEncodedToken(token.encodeToUrlString()); } - return tryEachDatanode(xceiverClient.getPipeline(), - d -> readBlock(xceiverClient, - validators, blockID, builder, readBlockRequest, d, replicaIndexes), - d -> toErrorMessage(blockID, d)); + readBlock(xceiverClient, blockID, builder, readBlockRequest, xceiverClient.getPipeline().getFirstNode(), + replicaIndexes, streamObserver); } - private static ReadBlockResponseProto readBlock(XceiverClientSpi xceiverClient, - List validators, BlockID blockID, - ContainerCommandRequestProto.Builder builder, - ReadBlockRequestProto.Builder readBlockBuilder, - DatanodeDetails datanode, - Map replicaIndexes) throws IOException { + private static void readBlock(XceiverClientSpi xceiverClient, + BlockID blockID, ContainerCommandRequestProto.Builder builder, ReadBlockRequestProto.Builder readBlockBuilder, + DatanodeDetails datanode, Map replicaIndexes, + StreamingReaderSpi streamObserver) throws IOException, InterruptedException { final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); int replicaIndex = replicaIndexes.getOrDefault(datanode, 0); if (replicaIndex > 0) { @@ -956,8 +947,6 @@ private static ReadBlockResponseProto readBlock(XceiverClientSpi xceiverClient, final ContainerCommandRequestProto request = builder .setDatanodeUuid(datanode.getUuidString()) .setReadBlock(readBlockBuilder).build(); - ContainerCommandResponseProto response = - xceiverClient.sendCommand(request, validators); - return response.getReadBlock(); + xceiverClient.streamRead(request, streamObserver); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index e6be4a490b43..6dba6abf9d09 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -818,8 +818,8 @@ public StateMachine.DataChannel getStreamDataChannel( @Override public void streamDataReadOnly(ContainerCommandRequestProto msg, - StreamObserver streamObserver, - DispatcherContext dispatcherContext) { + StreamObserver streamObserver, + DispatcherContext dispatcherContext) { Type cmdType = msg.getCmdType(); String traceID = msg.getTraceID(); Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID); @@ -829,8 +829,7 @@ public void streamDataReadOnly(ContainerCommandRequestProto msg, try (UncheckedAutoCloseable ignored = protocolMetrics.measure(cmdType)) { Preconditions.checkNotNull(msg); if (LOG.isTraceEnabled()) { - LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(), - traceID); + LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(), traceID); } PerformanceStringBuilder perf = new PerformanceStringBuilder(); @@ -849,20 +848,17 @@ public void streamDataReadOnly(ContainerCommandRequestProto msg, ContainerProtos.Result.CONTAINER_MISSING); } if (container == null) { - throw new StorageContainerException( - "ContainerID " + containerID + " does not exist", + throw new StorageContainerException("ContainerID " + containerID + " does not exist", ContainerProtos.Result.CONTAINER_NOT_FOUND); } ContainerType containerType = getContainerType(container); Handler handler = getHandler(containerType); if (handler == null) { - throw new StorageContainerException("Invalid " + - "ContainerType " + containerType, + throw new StorageContainerException("Invalid " + "ContainerType " + containerType, ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); } perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime); - responseProto = handler.readBlock( - msg, container, dispatcherContext, streamObserver); + responseProto = handler.readBlock(msg, container, dispatcherContext, streamObserver); long oPLatencyMS = Time.monotonicNow() - startTime; metrics.incContainerOpsLatencies(cmdType, oPLatencyMS); if (responseProto == null) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java index d6c334550080..8a4a675187df 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -98,7 +98,6 @@ default void streamDataReadOnly( ContainerCommandRequestProto msg, StreamObserver streamObserver, DispatcherContext dispatcherContext) { - throw new UnsupportedOperationException( - "streamDataReadOnly not supported."); + throw new UnsupportedOperationException("streamDataReadOnly not supported."); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index ecf13cf12afa..2f4177b2d065 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -59,6 +59,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; import static org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline; import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -69,7 +70,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; @@ -77,6 +80,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -88,7 +92,6 @@ import java.util.concurrent.locks.Lock; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -124,6 +127,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.ChunkBufferToByteString; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -176,6 +180,7 @@ public class KeyValueHandler extends Handler { private static final Logger LOG = LoggerFactory.getLogger( KeyValueHandler.class); + private static final int STREAMING_BYTES_PER_CHUNK = 1024 * 64; private final BlockManager blockManager; private final ChunkManager chunkManager; @@ -2055,110 +2060,81 @@ public ContainerCommandResponseProto readBlock( ContainerCommandRequestProto request, Container kvContainer, DispatcherContext dispatcherContext, StreamObserver streamObserver) { + + if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException("Only File Per Block is supported", IO_EXCEPTION), request); + } + ContainerCommandResponseProto responseProto = null; if (!request.hasReadBlock()) { if (LOG.isDebugEnabled()) { - LOG.debug("Malformed Read Block request. trace ID: {}", - request.getTraceID()); + LOG.debug("Malformed Read Block request. trace ID: {}", request.getTraceID()); } return malformedRequest(request); } try { ReadBlockRequestProto readBlock = request.getReadBlock(); - BlockID blockID = BlockID.getFromProtobuf( - readBlock.getBlockID()); + BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID()); // This is a new api the block should always be checked. BlockUtils.verifyReplicaIdx(kvContainer, blockID); BlockUtils.verifyBCSId(kvContainer, blockID); + File blockFile = FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused"); + BlockData blockData = getBlockManager().getBlock(kvContainer, blockID); List chunkInfos = blockData.getChunks(); - long blockOffset = 0; - int chunkIndex = -1; - long chunkOffset = 0; - long offset = readBlock.getOffset(); - for (int i = 0; i < chunkInfos.size(); i++) { - final long chunkLen = chunkInfos.get(i).getLen(); - blockOffset += chunkLen; - if (blockOffset > offset) { - chunkIndex = i; - chunkOffset = offset - blockOffset + chunkLen; - break; - } - } - Preconditions.checkState(chunkIndex >= 0); - - if (dispatcherContext == null) { - dispatcherContext = DispatcherContext.getHandleReadBlock(); - } - - ChunkBufferToByteString data; + // To get the chunksize, check the first chunk. Either there is only 1 chunk and its the largest, or there are + // multiple chunks and they are all the same size except the last one. + long bytesPerChunk = chunkInfos.get(0).getLen(); + // The bytes per checksum is stored in the checksum data of each chunk, so check the first chunk as they all + // must be the same. + ContainerProtos.ChecksumType checksumType = chunkInfos.get(0).getChecksumData().getType(); + ChecksumData checksumData = null; + int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK; + if (checksumType == ContainerProtos.ChecksumType.NONE) { + checksumData = new ChecksumData(checksumType, 0); + } else { + bytesPerChecksum = chunkInfos.get(0).getChecksumData().getBytesPerChecksum(); + } + // We have to align the read to checksum boundaries, so whatever offset is requested, we have to move back to the + // previous checksum boundary. + // eg if bytesPerChecksum is 512, and the requested offset is 600, we have to move back to 512. + // If the checksum type is NONE, we don't have to do this, but using no checksums should be rare in practice and + // it simplifies the code to always do this. + long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() % bytesPerChecksum; + try (RandomAccessFile file = new RandomAccessFile(blockFile, "r"); + FileChannel channel = file.getChannel()) { + ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum); + channel.position(adjustedOffset); + while (channel.read(buffer) != -1) { + buffer.flip(); + if (checksumType != ContainerProtos.ChecksumType.NONE) { + // As the checksums are stored "chunk by chunk", we need to figure out which chunk we start reading from, + // and its offset to pull out the correct checksum bytes for each read. + int chunkIndex = (int) (adjustedOffset / bytesPerChunk); + int chunkOffset = (int) (adjustedOffset % bytesPerChunk); + int checksumIndex = chunkOffset / bytesPerChecksum; + ByteString checksum = blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex); + checksumData = new ChecksumData(checksumType, bytesPerChecksum, Collections.singletonList(checksum)); + } + streamObserver.onNext(getReadBlockResponse(request, checksumData, buffer, adjustedOffset)); + buffer.clear(); - long len = readBlock.getLen(); - long adjustedChunkOffset, adjustedChunkLen; - do { - ContainerProtos.ChunkInfo chunk = chunkInfos.get(chunkIndex); - if (readBlock.getVerifyChecksum()) { - Pair adjustedOffsetAndLength = - computeChecksumBoundaries(chunk, chunkOffset, len); - adjustedChunkOffset = adjustedOffsetAndLength.getLeft(); - adjustedChunkLen = adjustedOffsetAndLength.getRight(); - adjustedChunkOffset += chunk.getOffset(); - } else { - adjustedChunkOffset = chunkOffset; - adjustedChunkLen = Math.min( - chunk.getLen() + chunk.getOffset() - chunkOffset, len); + adjustedOffset += bytesPerChecksum; } - - ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf( - ContainerProtos.ChunkInfo.newBuilder(chunk) - .setOffset(adjustedChunkOffset) - .setLen(adjustedChunkLen).build()); - BlockUtils.verifyReplicaIdx(kvContainer, blockID); - BlockUtils.verifyBCSId(kvContainer, blockID); - data = getChunkManager().readChunk( - kvContainer, blockID, chunkInfo, dispatcherContext); - - Preconditions.checkNotNull(data, "Chunk data is null"); - streamObserver.onNext( - getReadBlockResponse(request, - blockData.getProtoBufMessage().getBlockID(), - chunkInfo.getProtoBufMessage(), - data, byteBufferToByteString)); - len -= adjustedChunkLen + adjustedChunkOffset - chunkOffset; - chunkOffset = 0; - chunkIndex++; - } while (len > 0); - - metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen()); + } + // TODO metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen()); } catch (StorageContainerException ex) { responseProto = ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ioe) { responseProto = ContainerUtils.logAndReturnError(LOG, - new StorageContainerException("Read Block failed", ioe, IO_EXCEPTION), - request); + new StorageContainerException("Read Block failed", ioe, IO_EXCEPTION), request); } return responseProto; } - private Pair computeChecksumBoundaries( - ContainerProtos.ChunkInfo chunkInfo, long startByteIndex, long dataLen) { - - int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum(); - - // index of the last byte to be read from chunk, inclusively. - final long endByteIndex = startByteIndex + dataLen - 1; - - long adjustedChunkOffset = (startByteIndex / bytesPerChecksum) - * bytesPerChecksum; // inclusive - final long endIndex = ((endByteIndex / bytesPerChecksum) + 1) - * bytesPerChecksum; // exclusive - long adjustedChunkLen = - Math.min(endIndex, chunkInfo.getLen()) - adjustedChunkOffset; - return Pair.of(adjustedChunkOffset, adjustedChunkLen); - } - @Override public void addFinalizedBlock(Container container, long localID) { KeyValueContainer keyValueContainer = (KeyValueContainer)container; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index efede65e524d..6afee1c5d77f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -42,10 +42,8 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMostOnce; -import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -55,11 +53,9 @@ import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; -import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -72,29 +68,22 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; -import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; -import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -105,28 +94,22 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; -import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,10 +127,7 @@ public class TestKeyValueHandler { private Path dbFile; private static final long DUMMY_CONTAINER_ID = 9999; - private static final long LOCAL_ID = 1; private static final String DUMMY_PATH = "dummy/dir/doesnt/exist"; - private static final long CHUNK_SIZE = 1024 * 1024; // 1MB - private static final long BYTES_PER_CHECKSUM = 256 * 1024; private static final String DATANODE_UUID = UUID.randomUUID().toString(); private static final String CLUSTER_ID = UUID.randomUUID().toString(); @@ -958,77 +938,4 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { return kvHandler; } - - @Test - public void testReadBlock() throws IOException { - - StreamObserver streamObserver = mock(StreamObserver.class); - KeyValueContainer container = mock(KeyValueContainer.class); - final KeyValueHandler kvHandler = new KeyValueHandler(new OzoneConfiguration(), - UUID.randomUUID().toString(), mock(ContainerSet.class), mock(VolumeSet.class), mock(ContainerMetrics.class), - mock(IncrementalReportSender.class), mock(ContainerChecksumTreeManager.class)); - final KeyValueHandler keyValueHandler = spy(kvHandler); - DispatcherContext dispatcherContext = mock(DispatcherContext.class); - - List chunkInfoList = new ArrayList<>(); - BlockData blockData = new BlockData(new BlockID(1, 1)); - for (int i = 0; i < 4; i++) { - chunkInfoList.add(ContainerProtos.ChunkInfo - .newBuilder() - .setOffset(0) - .setLen(CHUNK_SIZE) - .setChecksumData( - ChecksumData.newBuilder().setBytesPerChecksum((int) BYTES_PER_CHECKSUM) - .setType(ChecksumType.CRC32).build()) - .setChunkName("chunkName" + i) - .build()); - } - blockData.setChunks(chunkInfoList); - - try (MockedStatic blockUtils = mockStatic(BlockUtils.class)) { - BlockManager blockManager = mock(BlockManager.class); - ChunkManager chunkManager = mock(ChunkManager.class); - when(keyValueHandler.getBlockManager()).thenReturn(blockManager); - when(keyValueHandler.getChunkManager()).thenReturn(chunkManager); - when(blockManager.getBlock(any(), any())).thenReturn(blockData); - ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(0)); - when(chunkManager.readChunk(any(), any(), - any(), any())) - .thenReturn(data); - testReadBlock(0, 1, keyValueHandler, dispatcherContext, - streamObserver, container); - testReadBlock(0, CHUNK_SIZE + 1, keyValueHandler, dispatcherContext, - streamObserver, container); - testReadBlock(CHUNK_SIZE / 2, 2 * CHUNK_SIZE, keyValueHandler, dispatcherContext, - streamObserver, container); - } - } - - private static ContainerCommandRequestProto readBlockRequest( - long offset, long length) { - return ContainerCommandRequestProto.newBuilder() - .setCmdType(Type.ReadBlock) - .setReadBlock( - ContainerProtos.ReadBlockRequestProto.newBuilder() - .setBlockID( - ContainerProtos.DatanodeBlockID.newBuilder() - .setContainerID(DUMMY_CONTAINER_ID) - .setLocalID(LOCAL_ID)) - .setOffset(offset) - .setLen(length) - .setVerifyChecksum(true)) - .setContainerID(DUMMY_CONTAINER_ID) - .setDatanodeUuid(UUID.randomUUID().toString()) - .build(); - } - - private static void testReadBlock( - long offset, long length, KeyValueHandler keyValueHandler, DispatcherContext dispatcherContext, - StreamObserver streamObserver, KeyValueContainer container) { - int responseCount = (int) (((offset + length - 1) / CHUNK_SIZE) + 1 - (offset / CHUNK_SIZE)); - ContainerCommandRequestProto requestProto = readBlockRequest(offset, length); - keyValueHandler.readBlock(requestProto, container, dispatcherContext, streamObserver); - verify(streamObserver, times(responseCount)).onNext(any()); - clearInvocations(streamObserver); - } } diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index c55482445601..6b4d8f1bd7f9 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -401,12 +401,12 @@ message ListBlockResponseProto { message ReadBlockRequestProto { required DatanodeBlockID blockID = 1; required uint64 offset = 2; - required uint64 len = 3; - required bool verifyChecksum = 4; } message ReadBlockResponseProto { - repeated ReadChunkResponseProto readChunk = 1; + required ChecksumData checksumData = 1; + required uint64 offset = 2; + required bytes data = 3; } message EchoRequestProto { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java index 80ae5118467e..bb66a303155e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java @@ -18,24 +18,21 @@ package org.apache.hadoop.ozone.client.rpc.read; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.KeyInputStream; -import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; -import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.om.TestBucket; +import org.junit.jupiter.api.Test; /** * Tests {@link StreamBlockInputStream}. @@ -45,8 +42,12 @@ public class TestStreamBlockInputStream extends TestInputStreamBase { * Run the tests as a single test method to avoid needing a new mini-cluster * for each test. */ - @ContainerLayoutTestInfo.ContainerTest - void testAll(ContainerLayoutVersion layout) throws Exception { + private static final int DATA_LENGTH = (2 * BLOCK_SIZE) + (CHUNK_SIZE); + private byte[] inputData; + private TestBucket bucket; + + @Test + void testAll() throws Exception { try (MiniOzoneCluster cluster = newCluster()) { cluster.waitForClusterToBeReady(); @@ -55,14 +56,22 @@ void testAll(ContainerLayoutVersion layout) throws Exception { clientConfig.setStreamReadBlock(true); OzoneConfiguration copy = new OzoneConfiguration(conf); copy.setFromObject(clientConfig); + String keyName = getNewKeyName(); try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) { - updateConfig(layout); - TestBucket bucket = TestBucket.newBuilder(client).build(); - - testBlockReadBuffers(bucket); - testBufferRelease(bucket); - testCloseReleasesBuffers(bucket); - testReadEmptyBlock(bucket); + bucket = TestBucket.newBuilder(client).build(); + inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH); + testReadKeyFully(keyName); + testSeek(keyName); + testReadEmptyBlock(); + } + keyName = getNewKeyName(); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); + copy.setFromObject(clientConfig); + try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) { + bucket = TestBucket.newBuilder(client).build(); + inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH); + testReadKeyFully(keyName); + testSeek(keyName); } } } @@ -71,175 +80,66 @@ void testAll(ContainerLayoutVersion layout) throws Exception { * Test to verify that data read from blocks is stored in a list of buffers * with max capacity equal to the bytes per checksum. */ - private void testBlockReadBuffers(TestBucket bucket) throws Exception { - String keyName = getNewKeyName(); - int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE); - byte[] inputData = bucket.writeRandomBytes(keyName, dataLength); - - try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) { - - StreamBlockInputStream block0Stream = - (StreamBlockInputStream) keyInputStream.getPartStreams().get(0); - - - // To read 1 byte of chunk data, ChunkInputStream should get one full - // checksum boundary worth of data from Container and store it in buffers. - IOUtils.readFully(block0Stream, new byte[1]); - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - - // Read > checksum boundary of data from chunk0 - int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2); - byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen); - bucket.validateData(inputData, 0, readData); - - // The first checksum boundary size of data was already existing in the - // ChunkStream buffers. Once that data is read, the next checksum - // boundary size of data will be fetched again to read the remaining data. - // Hence, there should be 1 checksum boundary size of data stored in the - // ChunkStreams buffers at the end of the read. - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - - // Seek to a position in the third checksum boundary (so that current - // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM - // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of - // data being read into the buffers. There should be 2 buffers in the - // stream but the first buffer should be released after it is read - // and the second buffer should have BYTES_PER_CHECKSUM capacity. - int offset = 2 * BYTES_PER_CHECKSUM + 1; - readData = readDataFromBlock(block0Stream, offset, readDataLen); - bucket.validateData(inputData, offset, readData); - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - - - // Read the full chunk data -1 and verify that all chunk data is read into - // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be - // released once all chunk data is read. - readData = readDataFromBlock(block0Stream, 0, CHUNK_SIZE - 1); - bucket.validateData(inputData, 0, readData); - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - - // Read the last byte of chunk and verify that the buffers are released. - IOUtils.readFully(block0Stream, new byte[1]); - assertNull(block0Stream.getCachedBuffers(), - "ChunkInputStream did not release buffers after reaching EOF."); + private void testReadKeyFully(String key) throws Exception { + // Read the data fully into a large enough byte array + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + byte[] readData = new byte[DATA_LENGTH]; + int totalRead = keyInputStream.read(readData, 0, DATA_LENGTH); + assertEquals(DATA_LENGTH, totalRead); + for (int i = 0; i < DATA_LENGTH; i++) { + assertEquals(inputData[i], readData[i], + "Read data is not same as written data at index " + i); + } } - } - - private void testCloseReleasesBuffers(TestBucket bucket) throws Exception { - String keyName = getNewKeyName(); - bucket.writeRandomBytes(keyName, CHUNK_SIZE); - - try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) { - StreamBlockInputStream block0Stream = - (StreamBlockInputStream) keyInputStream.getPartStreams().get(0); - - readDataFromBlock(block0Stream, 0, 1); - assertNotNull(block0Stream.getCachedBuffers()); - - block0Stream.close(); - - assertNull(block0Stream.getCachedBuffers()); + // Read the data 1 byte at a time + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + for (int i = 0; i < DATA_LENGTH; i++) { + int b = keyInputStream.read(); + assertEquals(inputData[i], (byte) b, + "Read data is not same as written data at index " + i); + } } - } - - /** - * Test that ChunkInputStream buffers are released as soon as the last byte - * of the buffer is read. - */ - private void testBufferRelease(TestBucket bucket) throws Exception { - String keyName = getNewKeyName(); - byte[] inputData = bucket.writeRandomBytes(keyName, CHUNK_SIZE); - - try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) { - - StreamBlockInputStream block0Stream = - (StreamBlockInputStream) keyInputStream.getPartStreams().get(0); - - // Read checksum boundary - 1 bytes of data - int readDataLen = BYTES_PER_CHECKSUM - 1; - byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen); - bucket.validateData(inputData, 0, readData); - - // There should be 1 byte of data remaining in the buffer which is not - // yet read. Hence, the buffer should not be released. - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - assertEquals(1, block0Stream.getCachedBuffers()[0].remaining()); - - // Reading the last byte in the buffer should result in all the buffers - // being released. - readData = readDataFromBlock(block0Stream, 1); - bucket.validateData(inputData, readDataLen, readData); - assertNull(block0Stream.getCachedBuffers(), - "Chunk stream buffers not released after last byte is read"); - - // Read more data to get the data till the next checksum boundary. - readDataLen = BYTES_PER_CHECKSUM / 2; - readDataFromBlock(block0Stream, readDataLen); - // There should be one buffer and the buffer should not be released as - // there is data pending to be read from the buffer - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - ByteBuffer lastCachedBuffer = block0Stream.getCachedBuffers()[0]; - assertEquals(BYTES_PER_CHECKSUM - readDataLen, - lastCachedBuffer.remaining()); - - // Read more than the remaining data in buffer (but less than the next - // checksum boundary). - int position = (int) block0Stream.getPos(); - readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2; - readData = readDataFromBlock(block0Stream, readDataLen); - bucket.validateData(inputData, position, readData); - // After reading the remaining data in the buffer, the buffer should be - // released and next checksum size of data must be read into the buffers - checkBufferSizeAndCapacity(block0Stream.getCachedBuffers()); - // Verify that the previously cached buffer is released by comparing it - // with the current cached buffer - assertNotEquals(lastCachedBuffer, - block0Stream.getCachedBuffers()[0]); + // Read the data into a large enough ByteBuffer + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + ByteBuffer readBuf = ByteBuffer.allocate(DATA_LENGTH); + int totalRead = keyInputStream.read(readBuf); + assertEquals(DATA_LENGTH, totalRead); + readBuf.flip(); + for (int i = 0; i < DATA_LENGTH; i++) { + assertEquals(inputData[i], readBuf.get(), + "Read data is not same as written data at index " + i); + } } } - private byte[] readDataFromBlock(StreamBlockInputStream streamBlockInputStream, - int offset, int readDataLength) throws IOException { - byte[] readData = new byte[readDataLength]; - streamBlockInputStream.seek(offset); - IOUtils.readFully(streamBlockInputStream, readData); - return readData; - } - - private byte[] readDataFromBlock(StreamBlockInputStream streamBlockInputStream, - int readDataLength) throws IOException { - byte[] readData = new byte[readDataLength]; - IOUtils.readFully(streamBlockInputStream, readData); - return readData; - } - - /** - * Verify number of buffers and their capacities. - * @param buffers chunk stream buffers - */ - private void checkBufferSizeAndCapacity(ByteBuffer[] buffers) { - assertEquals(1, buffers.length, - "ChunkInputStream does not have expected number of " + - "ByteBuffers"); - for (ByteBuffer buffer : buffers) { - assertEquals(BYTES_PER_CHECKSUM, buffer.capacity(), - "ChunkInputStream ByteBuffer capacity is wrong"); + private void testSeek(String key) throws IOException { + java.util.Random random = new java.util.Random(); + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + for (int i = 0; i < 100; i++) { + int position = random.nextInt(DATA_LENGTH); + keyInputStream.seek(position); + int b = keyInputStream.read(); + assertEquals(inputData[position], (byte) b, "Read data is not same as written data at index " + position); + } + StreamBlockInputStream blockStream = (StreamBlockInputStream) keyInputStream.getPartStreams().get(0); + long length = blockStream.getLength(); + blockStream.seek(10); + long position = blockStream.getPos(); + assertThrows(IOException.class, () -> blockStream.seek(length + 1), + "Seek beyond block length should throw exception"); + assertThrows(IOException.class, () -> blockStream.seek(-1), + "Seeking to a negative position should throw exception"); + assertEquals(position, blockStream.getPos(), + "Position should not change after failed seek attempts"); } } - private void testReadEmptyBlock(TestBucket bucket) throws Exception { + private void testReadEmptyBlock() throws Exception { String keyName = getNewKeyName(); - int dataLength = 10; bucket.writeRandomBytes(keyName, 0); - try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) { - - byte[] readData = new byte[dataLength]; assertTrue(keyInputStream.getPartStreams().isEmpty()); - IOUtils.read(keyInputStream, readData); - for (byte b : readData) { - assertEquals((byte) 0, b); - } + assertEquals(-1, keyInputStream.read()); } } }