diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 5dc44f4d4ec5..7329f2c16b74 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -113,6 +113,13 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private long streamBufferMaxSize = 32 * 1024 * 1024; + @Config(key = "stream.readblock.enable", + defaultValue = "false", + type = ConfigType.BOOLEAN, + description = "Allow ReadBlock to stream all the readChunk in one request.", + tags = ConfigTag.CLIENT) + private boolean streamReadBlock = false; + @Config(key = "ozone.client.max.retries", defaultValue = "5", description = "Maximum number of retries by Ozone Client on " @@ -151,7 +158,7 @@ public class OzoneClientConfig { description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] " + "determines which algorithm would be used to compute checksum for " + "chunk data. Default checksum type is CRC32.", - tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE }) + tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE}) private String checksumType = ChecksumType.CRC32.name(); @Config(key = "ozone.client.bytes.per.checksum", @@ -160,7 +167,7 @@ public class OzoneClientConfig { description = "Checksum will be computed for every bytes per checksum " + "number of bytes and stored sequentially. The minimum value for " + "this config is 8KB.", - tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE }) + tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE}) private int bytesPerChecksum = 16 * 1024; @Config(key = "ozone.client.verify.checksum", @@ -538,6 +545,14 @@ public int getMaxConcurrentWritePerKey() { return this.maxConcurrentWritePerKey; } + public boolean isStreamReadBlock() { + return streamReadBlock; + } + + public void setStreamReadBlock(boolean streamReadBlock) { + this.streamReadBlock = streamReadBlock; + } + /** * Enum for indicating what mode to use when combining chunk and block * checksums to define an aggregate FileChecksum. This should be considered diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index b07cee4097c0..923a61e384e2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -384,25 +384,9 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry( }); } - private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List validators) - throws IOException { - ContainerCommandResponseProto responseProto = null; - IOException ioException = null; - - // In case of an exception or an error, we will try to read from the - // datanodes in the pipeline in a round-robin fashion. - XceiverClientReply reply = new XceiverClientReply(null); + private List sortDatanodes(ContainerCommandRequestProto request) throws IOException { List datanodeList = null; - - DatanodeBlockID blockID = null; - if (request.getCmdType() == ContainerProtos.Type.GetBlock) { - blockID = request.getGetBlock().getBlockID(); - } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { - blockID = request.getReadChunk().getBlockID(); - } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) { - blockID = request.getGetSmallFile().getBlock().getBlockID(); - } + DatanodeBlockID blockID = getRequestBlockID(request); if (blockID != null) { if (request.getCmdType() != ContainerProtos.Type.ReadChunk) { @@ -440,6 +424,33 @@ private XceiverClientReply sendCommandWithRetry( if (!allInService) { datanodeList = sortDatanodeByOperationalState(datanodeList); } + return datanodeList; + } + + private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto request) { + DatanodeBlockID blockID = null; + if (request.getCmdType() == ContainerProtos.Type.GetBlock) { + blockID = request.getGetBlock().getBlockID(); + } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { + blockID = request.getReadChunk().getBlockID(); + } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) { + blockID = request.getGetSmallFile().getBlock().getBlockID(); + } else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) { + blockID = request.getReadBlock().getBlockID(); + } + return blockID; + } + + private XceiverClientReply sendCommandWithRetry( + ContainerCommandRequestProto request, List validators) + throws IOException { + ContainerCommandResponseProto responseProto = null; + IOException ioException = null; + + // In case of an exception or an error, we will try to read from the + // datanodes in the pipeline in a round-robin fashion. + XceiverClientReply reply = new XceiverClientReply(null); + List datanodeList = sortDatanodes(request); for (DatanodeDetails dn : datanodeList) { try { @@ -495,7 +506,7 @@ private XceiverClientReply sendCommandWithRetry( String message = "Failed to execute command {}"; if (LOG.isDebugEnabled()) { LOG.debug(message + " on the pipeline {}.", - processForDebug(request), pipeline); + processForDebug(request), pipeline); } else { LOG.warn(message + " on the pipeline {}.", request.getCmdType(), pipeline); @@ -504,6 +515,62 @@ private XceiverClientReply sendCommandWithRetry( } } + /** + * Starts a streaming read operation, intended to read entire blocks from the datanodes. This method expects a + * {@link StreamingReaderSpi} to be passed in, which will be used to receive the streamed data from the datanode. + * Upon successfully starting the streaming read, a {@link StreamingReadResponse} is set into the pass StreamObserver, + * which contains information about the datanode used for the read, and the request observer that can be used to + * manage the stream (e.g., to cancel it if needed). A semaphore is acquired to limit the number of concurrent + * streaming reads so upon successful return of this method, the caller must ensure to call + * {@link #completeStreamRead()} to release the semaphore once the streaming read is complete. + * @param request The container command request to initiate the streaming read. + * @param streamObserver The observer that will handle the streamed responses.= + * @throws IOException + * @throws InterruptedException + */ + @Override + public void streamRead(ContainerCommandRequestProto request, + StreamingReaderSpi streamObserver) throws IOException, InterruptedException { + List datanodeList = sortDatanodes(request); + IOException lastException = null; + for (DatanodeDetails dn : datanodeList) { + try { + checkOpen(dn); + semaphore.acquire(); + XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID()); + if (stub == null) { + throw new IOException("Failed to get gRPC stub for DataNode: " + dn); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Executing command {} on datanode {}", processForDebug(request), dn); + } + stub.withDeadlineAfter(timeout, TimeUnit.SECONDS) + .streamBlock(request, streamObserver); + streamObserver.setStreamingDatanode(dn); + return; + } catch (IOException e) { + LOG.error("Failed to start streaming read to DataNode {}", dn, e); + semaphore.release(); + lastException = e; + } + } + if (lastException != null) { + throw lastException; + } else { + throw new IOException("Failed to start streaming read to any available DataNodes"); + } + } + + /** + * This method should be called to indicate the end of streaming read. Its primary purpose is to release the + * semaphore acquired when starting the streaming read, but is also used to update any metrics or debug logs as + * needed. + */ + @Override + public void completeStreamRead() { + semaphore.release(); + } + private static List sortDatanodeByOperationalState( List datanodeList) { List sortedDatanodeList = new ArrayList<>(datanodeList); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java index 6753f600a91b..dc47a9edacaf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java @@ -17,7 +17,20 @@ package org.apache.hadoop.hdds.scm.storage; +import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.token.Token; +import org.apache.ratis.thirdparty.io.grpc.Status; /** * Abstract class used as an interface for input streams related to Ozone @@ -26,6 +39,8 @@ public abstract class BlockExtendedInputStream extends ExtendedInputStream implements PartInputStream { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(BlockExtendedInputStream.class); + public abstract BlockID getBlockID(); @Override @@ -38,4 +53,86 @@ public long getRemaining() { @Override public abstract long getPos(); + + protected Pipeline setPipeline(Pipeline pipeline) throws IOException { + if (pipeline == null) { + return null; + } + long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count(); + + if (replicaIndexes > 1) { + throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.", + pipeline)); + } + + // irrespective of the container state, we will always read via Standalone protocol. + boolean okForRead = pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE + || pipeline.getType() == HddsProtos.ReplicationType.EC; + return okForRead ? pipeline : pipeline.copyForRead(); + } + + protected boolean shouldRetryRead(IOException cause, RetryPolicy retryPolicy, int retries) throws IOException { + RetryPolicy.RetryAction retryAction; + try { + retryAction = retryPolicy.shouldRetry(cause, retries, 0, true); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { + if (retryAction.delayMillis > 0) { + try { + LOG.debug("Retry read after {}ms", retryAction.delayMillis); + Thread.sleep(retryAction.delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy; + throw new IOException(msg, e); + } + } + return true; + } + return false; + } + + protected RetryPolicy getReadRetryPolicy(OzoneClientConfig config) { + return HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), + TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); + } + + protected void refreshBlockInfo(IOException cause, BlockID blockID, AtomicReference pipelineRef, + AtomicReference> tokenRef, Function refreshFunction) + throws IOException { + LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}", + blockID, pipelineRef.get().getId(), cause.getMessage()); + if (refreshFunction != null) { + LOG.debug("Re-fetching pipeline and block token for block {}", blockID); + BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID); + if (blockLocationInfo == null) { + LOG.warn("No new block location info for block {}", blockID); + } else { + pipelineRef.set(setPipeline(blockLocationInfo.getPipeline())); + LOG.info("New pipeline for block {}: {}", blockID, blockLocationInfo.getPipeline()); + + tokenRef.set(blockLocationInfo.getToken()); + if (blockLocationInfo.getToken() != null) { + OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); + tokenId.readFromByteArray(tokenRef.get().getIdentifier()); + LOG.info("A new token is added for block {}. Expiry: {}", blockID, + Instant.ofEpochMilli(tokenId.getExpiryDate())); + } + } + } else { + throw cause; + } + } + + /** + * Check if this exception is because datanodes are not reachable. + */ + protected boolean isConnectivityIssue(IOException ex) { + return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); + } + } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index c9731bcc4611..6f6b513422f7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -22,11 +22,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; @@ -35,19 +33,16 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.exception.SCMSecurityException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.security.token.Token; -import org.apache.ratis.thirdparty.io.grpc.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,14 +118,12 @@ public BlockInputStream( this.blockInfo = blockInfo; this.blockID = blockInfo.getBlockID(); this.length = blockInfo.getLength(); - setPipeline(pipeline); + pipelineRef.set(setPipeline(pipeline)); tokenRef.set(token); this.verifyChecksum = config.isChecksumVerify(); this.xceiverClientFactory = xceiverClientFactory; this.refreshFunction = refreshFunction; - this.retryPolicy = - HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), - TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); + this.retryPolicy = getReadRetryPolicy(config); } // only for unit tests @@ -182,7 +175,7 @@ public synchronized void initialize() throws IOException { } catchEx = ex; } - } while (shouldRetryRead(catchEx)); + } while (shouldRetryRead(catchEx, retryPolicy, ++retries)); if (chunks == null) { throw catchEx; @@ -215,37 +208,8 @@ public synchronized void initialize() throws IOException { } } - /** - * Check if this exception is because datanodes are not reachable. - */ - private boolean isConnectivityIssue(IOException ex) { - return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); - } - private void refreshBlockInfo(IOException cause) throws IOException { - LOG.info("Attempting to update pipeline and block token for block {} from pipeline {}: {}", - blockID, pipelineRef.get().getId(), cause.getMessage()); - if (refreshFunction != null) { - LOG.debug("Re-fetching pipeline and block token for block {}", blockID); - BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID); - if (blockLocationInfo == null) { - LOG.warn("No new block location info for block {}", blockID); - } else { - setPipeline(blockLocationInfo.getPipeline()); - LOG.info("New pipeline for block {}: {}", blockID, - blockLocationInfo.getPipeline()); - - tokenRef.set(blockLocationInfo.getToken()); - if (blockLocationInfo.getToken() != null) { - OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(); - tokenId.readFromByteArray(tokenRef.get().getIdentifier()); - LOG.info("A new token is added for block {}. Expiry: {}", - blockID, Instant.ofEpochMilli(tokenId.getExpiryDate())); - } - } - } else { - throw cause; - } + refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction); } /** @@ -277,26 +241,6 @@ protected BlockData getBlockDataUsingClient() throws IOException { return response.getBlockData(); } - private void setPipeline(Pipeline pipeline) throws IOException { - if (pipeline == null) { - return; - } - long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count(); - - if (replicaIndexes > 1) { - throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.", - pipeline)); - } - - // irrespective of the container state, we will always read via Standalone - // protocol. - boolean okForRead = - pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE - || pipeline.getType() == HddsProtos.ReplicationType.EC; - Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead(); - pipelineRef.set(readPipeline); - } - private static void validate(ContainerCommandResponseProto response) throws IOException { if (!response.hasGetBlock()) { @@ -382,14 +326,14 @@ protected synchronized int readWithStrategy(ByteReaderStrategy strategy) } catch (SCMSecurityException ex) { throw ex; } catch (StorageContainerException e) { - if (shouldRetryRead(e)) { + if (shouldRetryRead(e, retryPolicy, ++retries)) { handleReadError(e); continue; } else { throw e; } } catch (IOException ex) { - if (shouldRetryRead(ex)) { + if (shouldRetryRead(ex, retryPolicy, ++retries)) { if (isConnectivityIssue(ex)) { handleReadError(ex); } else { @@ -573,31 +517,6 @@ private synchronized void storePosition() { blockPosition = getPos(); } - private boolean shouldRetryRead(IOException cause) throws IOException { - RetryPolicy.RetryAction retryAction; - try { - retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(e); - } - if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { - if (retryAction.delayMillis > 0) { - try { - LOG.debug("Retry read after {}ms", retryAction.delayMillis); - Thread.sleep(retryAction.delayMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - String msg = "Interrupted: action=" + retryAction.action + ", retry policy=" + retryPolicy; - throw new IOException(msg, e); - } - } - return true; - } - return false; - } - private void handleReadError(IOException cause) throws IOException { releaseClient(); final List inputStreams = this.chunkStreams; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java new file mode 100644 index 000000000000..6284bd8b35f1 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.security.token.Token; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link java.io.InputStream} called from KeyInputStream to read a block from the + * container. + */ +public class StreamBlockInputStream extends BlockExtendedInputStream + implements Seekable, CanUnbuffer, ByteBufferReadable { + private static final Logger LOG = LoggerFactory.getLogger(StreamBlockInputStream.class); + private static final int EOF = -1; + private static final Throwable CANCELLED_EXCEPTION = new Throwable("Cancelled by client"); + + private final BlockID blockID; + private final long blockLength; + private final AtomicReference pipelineRef = new AtomicReference<>(); + private final AtomicReference> tokenRef = new AtomicReference<>(); + private XceiverClientFactory xceiverClientFactory; + private XceiverClientSpi xceiverClient; + + private ByteBuffer buffer; + private long position = 0; + private boolean initialized = false; + private StreamingReader streamingReader; + + private final boolean verifyChecksum; + private final Function refreshFunction; + private final RetryPolicy retryPolicy; + private int retries = 0; + + public StreamBlockInputStream( + BlockID blockID, long length, Pipeline pipeline, + Token token, + XceiverClientFactory xceiverClientFactory, + Function refreshFunction, + OzoneClientConfig config) throws IOException { + this.blockID = blockID; + this.blockLength = length; + pipelineRef.set(setPipeline(pipeline)); + tokenRef.set(token); + this.xceiverClientFactory = xceiverClientFactory; + this.verifyChecksum = config.isChecksumVerify(); + this.retryPolicy = getReadRetryPolicy(config); + this.refreshFunction = refreshFunction; + } + + @Override + public BlockID getBlockID() { + return blockID; + } + + @Override + public long getLength() { + return blockLength; + } + + @Override + public synchronized long getPos() { + return position; + } + + @Override + public synchronized int read() throws IOException { + checkOpen(); + if (!dataAvailableToRead()) { + return EOF; + } + position++; + return buffer.get(); + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + ByteBuffer tmpBuffer = ByteBuffer.wrap(b, off, len); + return read(tmpBuffer); + } + + @Override + public synchronized int read(ByteBuffer targetBuf) throws IOException { + checkOpen(); + int read = 0; + while (targetBuf.hasRemaining()) { + if (!dataAvailableToRead()) { + break; + } + int toCopy = Math.min(buffer.remaining(), targetBuf.remaining()); + ByteBuffer tmpBuf = buffer.duplicate(); + tmpBuf.limit(tmpBuf.position() + toCopy); + targetBuf.put(tmpBuf); + buffer.position(tmpBuf.position()); + position += toCopy; + read += toCopy; + } + return read > 0 ? read : EOF; + } + + private boolean dataAvailableToRead() throws IOException { + if (position >= blockLength) { + return false; + } + initialize(); + if (buffer == null || buffer.remaining() == 0) { + int loaded = fillBuffer(); + return loaded != EOF; + } + return true; + } + + @Override + protected int readWithStrategy(ByteReaderStrategy strategy) throws IOException { + throw new NotImplementedException("readWithStrategy is not implemented."); + } + + @Override + public synchronized void seek(long pos) throws IOException { + checkOpen(); + if (pos < 0) { + throw new IOException("Cannot seek to negative offset"); + } + if (pos > blockLength) { + throw new IOException("Cannot seek after the end of the block"); + } + if (pos == position) { + return; + } + closeStream(); + position = pos; + } + + @Override + // The seekable interface indicates that seekToNewSource should seek to a new source of the data, + // ie a different datanode. This is not supported for now. + public synchronized boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized void unbuffer() { + releaseClient(); + } + + private void closeStream() { + if (streamingReader != null) { + streamingReader.cancel(); + streamingReader = null; + } + initialized = false; + buffer = null; + } + + protected synchronized void checkOpen() throws IOException { + if (xceiverClientFactory == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Block: " + blockID); + } + } + + protected synchronized void acquireClient() throws IOException { + checkOpen(); + if (xceiverClient == null) { + final Pipeline pipeline = pipelineRef.get(); + try { + xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline); + } catch (IOException ioe) { + LOG.warn("Failed to acquire client for pipeline {}, block {}", pipeline, blockID); + throw ioe; + } + } + } + + private void initialize() throws IOException { + if (initialized) { + return; + } + while (true) { + try { + acquireClient(); + streamingReader = new StreamingReader(); + ContainerProtocolCalls.readBlock(xceiverClient, position, blockID, tokenRef.get(), + pipelineRef.get().getReplicaIndexes(), streamingReader); + initialized = true; + return; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + handleExceptions(new IOException("Interrupted", ie)); + } catch (IOException ioe) { + handleExceptions(ioe); + } + } + } + + private void handleExceptions(IOException cause) throws IOException { + if (cause instanceof StorageContainerException || isConnectivityIssue(cause)) { + if (shouldRetryRead(cause, retryPolicy, retries++)) { + releaseClient(); + refreshBlockInfo(cause); + LOG.warn("Refreshing block data to read block {} due to {}", blockID, cause.getMessage()); + } else { + throw cause; + } + } else { + throw cause; + } + } + + private int fillBuffer() throws IOException { + if (!streamingReader.hasNext()) { + return EOF; + } + buffer = streamingReader.readNext(); + return buffer == null ? EOF : buffer.limit(); + } + + protected synchronized void releaseClient() { + if (xceiverClientFactory != null && xceiverClient != null) { + closeStream(); + xceiverClientFactory.releaseClientForReadData(xceiverClient, false); + xceiverClient = null; + } + } + + @Override + public synchronized void close() throws IOException { + releaseClient(); + xceiverClientFactory = null; + } + + private void refreshBlockInfo(IOException cause) throws IOException { + refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction); + } + + private synchronized void releaseStreamResources() { + if (xceiverClient != null) { + xceiverClient.completeStreamRead(); + } + } + + /** + * Implementation of a StreamObserver used to received and buffer streaming GRPC reads. + */ + public class StreamingReader implements StreamingReaderSpi { + + private final BlockingQueue responseQueue = new LinkedBlockingQueue<>(1); + private final AtomicBoolean completed = new AtomicBoolean(false); + private final AtomicBoolean failed = new AtomicBoolean(false); + private final AtomicBoolean semaphoreReleased = new AtomicBoolean(false); + private final AtomicReference error = new AtomicReference<>(); + private volatile ClientCallStreamObserver requestObserver; + private volatile DatanodeDetails streamingDatanodeDetails; + + public boolean hasNext() { + return !responseQueue.isEmpty() || !completed.get(); + } + + public ByteBuffer readNext() throws IOException { + if (failed.get()) { + Throwable cause = error.get(); + throw new IOException("Streaming read failed", cause); + } + + if (completed.get() && responseQueue.isEmpty()) { + return null; // Stream ended + } + + ReadBlockResponseProto readBlock; + try { + readBlock = responseQueue.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for response", e); + } + if (readBlock == null) { + if (failed.get()) { + Throwable cause = error.get(); + throw new IOException("Streaming read failed", cause); + } else if (completed.get()) { + return null; // Stream ended + } else { + throw new IOException("Timed out waiting for response"); + } + } + // The server always returns data starting from the last checksum boundary. Therefore if the reader position is + // ahead of the position we received from the server, we need to adjust the buffer position accordingly. + // If the reader position is behind + ByteBuffer buf = readBlock.getData().asReadOnlyByteBuffer(); + long blockOffset = readBlock.getOffset(); + long pos = getPos(); + if (pos < blockOffset) { + // This should not happen, and if it does, we have a bug. + throw new IOException("Received data out of order. Position is " + pos + " but received data at " + + blockOffset); + } + if (pos > readBlock.getOffset()) { + int offset = (int)(pos - readBlock.getOffset()); + buf.position(offset); + } + return buf; + } + + private void releaseResources() { + boolean wasNotYetComplete = !semaphoreReleased.getAndSet(true); + if (wasNotYetComplete) { + releaseStreamResources(); + } + } + + @Override + public void onNext(ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto) { + try { + ReadBlockResponseProto readBlock = containerCommandResponseProto.getReadBlock(); + ByteBuffer data = readBlock.getData().asReadOnlyByteBuffer(); + if (verifyChecksum) { + ChecksumData checksumData = ChecksumData.getFromProtoBuf(readBlock.getChecksumData()); + Checksum.verifyChecksum(data, checksumData, 0); + } + offerToQueue(readBlock); + } catch (OzoneChecksumException e) { + LOG.warn("Checksum verification failed for block {} from datanode {}", + getBlockID(), streamingDatanodeDetails, e); + cancelDueToError(e); + } + } + + @Override + public void onError(Throwable throwable) { + if (throwable instanceof StatusRuntimeException) { + if (((StatusRuntimeException) throwable).getStatus().getCode().name().equals("CANCELLED")) { + // This is expected when the client cancels the stream. + setCompleted(); + } + } else { + setFailed(throwable); + } + releaseResources(); + } + + @Override + public void onCompleted() { + setCompleted(); + releaseResources(); + } + + /** + * By calling cancel, the client will send a cancel signal to the server, which will stop sending more data and + * cause the onError() to be called in this observer with a CANCELLED exception. + */ + public void cancel() { + if (requestObserver != null) { + requestObserver.cancel("Cancelled by client", CANCELLED_EXCEPTION); + setCompleted(); + releaseResources(); + } + } + + public void cancelDueToError(Throwable exception) { + if (requestObserver != null) { + requestObserver.cancel("Cancelled by client due to error", CANCELLED_EXCEPTION); + setFailed(exception); + releaseResources(); + } + } + + private void setFailed(Throwable throwable) { + if (completed.get()) { + throw new IllegalArgumentException("Cannot mark a completed stream as failed"); + } + failed.set(true); + error.set(throwable); + } + + private void setCompleted() { + if (!failed.get()) { + completed.set(true); + } + } + + private void offerToQueue(ReadBlockResponseProto item) { + while (!completed.get() && !failed.get()) { + try { + if (responseQueue.offer(item, 100, TimeUnit.MILLISECONDS)) { + return; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + @Override + public void setStreamingDatanode(DatanodeDetails datanodeDetails) { + streamingDatanodeDetails = datanodeDetails; + } + + @Override + public void beforeStart( + ClientCallStreamObserver clientCallStreamObserver) { + this.requestObserver = clientCallStreamObserver; + } + } + +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index 3fbee6be871b..2765e2a00a8f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.client.io; +import static org.apache.hadoop.hdds.DatanodeVersion.STREAM_BLOCK_SUPPORT; + import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -25,6 +27,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; @@ -32,6 +35,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; +import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; @@ -85,6 +89,9 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig, blockInfo, xceiverFactory, refreshFunction, ecBlockStreamFactory, config); + } else if (config.isStreamReadBlock() && allDataNodesSupportStreamBlock(pipeline)) { + return new StreamBlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), pipeline, token, xceiverFactory, + refreshFunction, config); } else { return new BlockInputStream(blockInfo, pipeline, token, xceiverFactory, refreshFunction, @@ -92,4 +99,15 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, } } + private boolean allDataNodesSupportStreamBlock(Pipeline pipeline) { + // return true only if all DataNodes in the pipeline are on a version + // that supports for reading a block by streaming chunks.. + for (DatanodeDetails dn : pipeline.getNodes()) { + if (dn.getCurrentVersion() < STREAM_BLOCK_SUPPORT.toProtoValue()) { + return false; + } + } + return true; + } + } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 2e9a84cad429..1b5e7667e845 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -267,8 +267,8 @@ public void testSeekAndRead() throws Exception { @Test public void testRefreshPipelineFunction() throws Exception { - LogCapturer logCapturer = LogCapturer.captureLogs(BlockInputStream.class); - GenericTestUtils.setLogLevel(BlockInputStream.class, Level.DEBUG); + LogCapturer logCapturer = LogCapturer.captureLogs(BlockExtendedInputStream.class); + GenericTestUtils.setLogLevel(BlockExtendedInputStream.class, Level.DEBUG); BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); AtomicBoolean isRefreshed = new AtomicBoolean(); createChunkList(5); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java new file mode 100644 index 000000000000..3fa1e8ac3567 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Stream; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Time; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusException; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.invocation.InvocationOnMock; + +/** + * Tests for {@link TestStreamBlockInputStream}'s functionality. + */ +public class TestStreamBlockInputStream { + private static final int BYTES_PER_CHECKSUM = 1024; + private static final int BLOCK_SIZE = 1024; + private StreamBlockInputStream blockStream; + private final OzoneConfiguration conf = new OzoneConfiguration(); + private XceiverClientFactory xceiverClientFactory; + private XceiverClientGrpc xceiverClient; + private Checksum checksum; + private ChecksumData checksumData; + private byte[] data; + private ClientCallStreamObserver requestObserver; + private Function refreshFunction; + + @BeforeEach + public void setup() throws Exception { + Token token = mock(Token.class); + when(token.encodeToUrlString()).thenReturn("url"); + + Set modes = + Collections.singleton(HddsProtos.BlockTokenSecretProto.AccessModeProto.READ); + OzoneBlockTokenIdentifier tokenIdentifier = new OzoneBlockTokenIdentifier("owner", new BlockID(1, 1), + modes, Time.monotonicNow() + 10000, 10); + tokenIdentifier.setSecretKeyId(UUID.randomUUID()); + when(token.getIdentifier()).thenReturn(tokenIdentifier.getBytes()); + Pipeline pipeline = MockPipeline.createSingleNodePipeline(); + + BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class); + when(blockLocationInfo.getPipeline()).thenReturn(pipeline); + when(blockLocationInfo.getToken()).thenReturn(token); + + xceiverClient = mock(XceiverClientGrpc.class); + when(xceiverClient.getPipeline()).thenReturn(pipeline); + xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any())) + .thenReturn(xceiverClient); + requestObserver = mock(ClientCallStreamObserver.class); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamReadBlock(true); + clientConfig.setMaxReadRetryCount(1); + refreshFunction = mock(Function.class); + when(refreshFunction.apply(any())).thenReturn(blockLocationInfo); + BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); + checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM); + createDataAndChecksum(); + blockStream = new StreamBlockInputStream(blockID, BLOCK_SIZE, pipeline, + token, xceiverClientFactory, refreshFunction, clientConfig); + } + + @AfterEach + public void teardown() { + if (blockStream != null) { + try { + blockStream.close(); + } catch (IOException e) { + // ignore + } + } + } + + @Test + public void testCloseStreamReleasesResources() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertEquals(data[0], blockStream.read()); + blockStream.close(); + // Verify that cancel() was called on the requestObserver mock + verify(requestObserver).cancel(any(), any()); + // Verify that release() was called on the xceiverClient mock + verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, false); + verify(xceiverClient, times(1)).completeStreamRead(); + } + + @Test + public void testUnbufferReleasesResourcesAndResumesFromLastPosition() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertEquals(data[0], blockStream.read()); + assertEquals(1, blockStream.getPos()); + blockStream.unbuffer(); + // Verify that cancel() was called on the requestObserver mock + verify(requestObserver).cancel(any(), any()); + // Verify that release() was called on the xceiverClient mock + verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, false); + verify(xceiverClient, times(1)).completeStreamRead(); + // The next read should "rebuffer" and continue from the last position + assertEquals(data[1], blockStream.read()); + assertEquals(2, blockStream.getPos()); + } + + @Test + public void testSeekReleasesTheStreamAndStartsFromNewPosition() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertEquals(data[0], blockStream.read()); + blockStream.seek(100); + assertEquals(100, blockStream.getPos()); + // Verify that cancel() was called on the requestObserver mock + verify(requestObserver).cancel(any(), any()); + verify(xceiverClient, times(1)).completeStreamRead(); + // The xceiverClient should not be released + verify(xceiverClientFactory, never()) + .releaseClientForReadData(xceiverClient, false); + + assertEquals(data[100], blockStream.read()); + assertEquals(101, blockStream.getPos()); + } + + @Test + public void testErrorThrownIfStreamReturnsError() throws IOException, InterruptedException { + // Note the error will only be thrown when the buffer needs to be refilled. I think case, as its the first + // read it will try to fill the buffer and encounter the error, but a reader could continue reading until the + // buffer is exhausted before seeing the error. + doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + streamObserver.setStreamingDatanode(MockDatanodeDetails.randomDatanodeDetails()); + streamObserver.beforeStart(requestObserver); + streamObserver.onError(new IOException("Test induced error")); + return null; + }).when(xceiverClient).streamRead(any(), any()); + assertThrows(IOException.class, () -> blockStream.read()); + verify(xceiverClient, times(0)).completeStreamRead(); + } + + @Test + public void seekOutOfBounds() throws IOException, InterruptedException { + setupSuccessfulRead(); + assertThrows(IOException.class, () -> blockStream.seek(-1)); + assertThrows(IOException.class, () -> blockStream.seek(BLOCK_SIZE + 1)); + } + + @Test + public void readPastEOFReturnsEOF() throws IOException, InterruptedException { + setupSuccessfulRead(); + blockStream.seek(BLOCK_SIZE); + // Ensure the stream is at EOF even after two attempts to read + assertEquals(-1, blockStream.read()); + assertEquals(-1, blockStream.read()); + assertEquals(BLOCK_SIZE, blockStream.getPos()); + } + + @Test + public void ensureExceptionThrownForReadAfterClosed() throws IOException, InterruptedException { + setupSuccessfulRead(); + blockStream.close(); + ByteBuffer byteBuffer = ByteBuffer.allocate(10); + byte[] byteArray = new byte[10]; + assertThrows(IOException.class, () -> blockStream.read()); + assertThrows(IOException.class, () -> { + // Findbugs complains about ignored return value without this :( + int r = blockStream.read(byteArray, 0, 10); + }); + assertThrows(IOException.class, () -> blockStream.read(byteBuffer)); + assertThrows(IOException.class, () -> blockStream.seek(10)); + } + + @ParameterizedTest + @MethodSource("exceptionsTriggeringRefresh") + public void testRefreshFunctionCalledForAllDNsBadOnInitialize(IOException thrown) + throws IOException, InterruptedException { + // In this case, if the first attempt to connect to any of the DNs fails, it should retry by refreshing the pipeline + + doAnswer((InvocationOnMock invocation) -> { + throw thrown; + }).doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + streamObserver.setStreamingDatanode(MockDatanodeDetails.randomDatanodeDetails()); + streamObserver.beforeStart(requestObserver); + streamObserver.onNext(createChunkResponse(false)); + streamObserver.onCompleted(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + blockStream.read(); + verify(refreshFunction, times(1)).apply(any()); + } + + @ParameterizedTest + @MethodSource("exceptionsNotTriggeringRefresh") + public void testRefreshNotCalledForAllDNsBadOnInitialize(IOException thrown) + throws IOException, InterruptedException { + // In this case, if the first attempt to connect to any of the DNs fails, it should retry by refreshing the pipeline + doAnswer((InvocationOnMock invocation) -> { + throw thrown; + }).when(xceiverClient).streamRead(any(), any()); + assertThrows(IOException.class, () -> blockStream.read()); + verify(refreshFunction, times(0)).apply(any()); + } + + @Test + public void testExceptionThrownAfterRetriesExhausted() throws IOException, InterruptedException { + // In this case, if the first attempt to connect to any of the DNs fails, it should retry by refreshing the pipeline + doAnswer((InvocationOnMock invocation) -> { + throw new StorageContainerException(CONTAINER_NOT_FOUND); + }).when(xceiverClient).streamRead(any(), any()); + + assertThrows(IOException.class, () -> blockStream.read()); + verify(refreshFunction, times(1)).apply(any()); + } + + @Test + public void testInvalidChecksumThrowsException() throws IOException, InterruptedException { + doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + streamObserver.setStreamingDatanode(MockDatanodeDetails.randomDatanodeDetails()); + streamObserver.beforeStart(requestObserver); + streamObserver.onNext(createChunkResponse(true)); + streamObserver.onCompleted(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + assertThrows(IOException.class, () -> blockStream.read()); + } + + private void createDataAndChecksum() throws OzoneChecksumException { + data = new byte[BLOCK_SIZE]; + new SecureRandom().nextBytes(data); + checksumData = checksum.computeChecksum(data); + } + + private void setupSuccessfulRead() throws IOException, InterruptedException { + doAnswer((InvocationOnMock invocation) -> { + StreamingReaderSpi streamObserver = invocation.getArgument(1); + streamObserver.setStreamingDatanode(MockDatanodeDetails.randomDatanodeDetails()); + streamObserver.beforeStart(requestObserver); + streamObserver.onNext(createChunkResponse(false)); + streamObserver.onCompleted(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + } + + private ContainerProtos.ContainerCommandResponseProto createChunkResponse(boolean invalidChecksum) { + ContainerProtos.ReadBlockResponseProto response = invalidChecksum ? + createInValidChecksumResponse() : createValidResponse(); + + return ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadBlock) + .setReadBlock(response) + .setResult(ContainerProtos.Result.SUCCESS) + .build(); + } + + private ContainerProtos.ReadBlockResponseProto createValidResponse() { + return ContainerProtos.ReadBlockResponseProto.newBuilder() + .setChecksumData(checksumData.getProtoBufMessage()) + .setData(ByteString.copyFrom(data)) + .setOffset(0) + .build(); + } + + private ContainerProtos.ReadBlockResponseProto createInValidChecksumResponse() { + byte[] invalidData = new byte[data.length]; + System.arraycopy(data, 0, invalidData, 0, data.length); + // Corrupt the data + invalidData[0] = (byte) (invalidData[0] + 1); + return ContainerProtos.ReadBlockResponseProto.newBuilder() + .setChecksumData(checksumData.getProtoBufMessage()) + .setData(ByteString.copyFrom(invalidData)) + .setOffset(0) + .build(); + } + + private static Stream exceptionsTriggeringRefresh() { + return Stream.of( + Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)), + Arguments.of(new IOException(new ExecutionException( + new StatusException(Status.UNAVAILABLE)))) + ); + } + + private static Stream exceptionsNotTriggeringRefresh() { + return Stream.of( + Arguments.of(new SCMSecurityException("Security problem")), + Arguments.of(new OzoneChecksumException("checksum missing")), + Arguments.of(new IOException("Some random exception.")) + ); + } + +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java index dbc42816036e..2fd81fc91dc0 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java @@ -39,7 +39,10 @@ import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; +import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; /** @@ -49,8 +52,9 @@ public class TestBlockInputStreamFactoryImpl { private OzoneConfiguration conf = new OzoneConfiguration(); - @Test - public void testNonECGivesBlockInputStream() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testNonECGivesBlockInputStream(boolean streamReadBlockEnabled) throws IOException { BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl(); ReplicationConfig repConfig = RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE); @@ -62,11 +66,16 @@ public void testNonECGivesBlockInputStream() throws IOException { Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1); OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setChecksumVerify(true); + clientConfig.setStreamReadBlock(streamReadBlockEnabled); BlockExtendedInputStream stream = factory.create(repConfig, blockInfo, blockInfo.getPipeline(), blockInfo.getToken(), null, null, clientConfig); - assertInstanceOf(BlockInputStream.class, stream); + if (streamReadBlockEnabled) { + assertInstanceOf(StreamBlockInputStream.class, stream); + } else { + assertInstanceOf(BlockInputStream.class, stream); + } assertEquals(stream.getBlockID(), blockInfo.getBlockID()); assertEquals(stream.getLength(), blockInfo.getLength()); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java index 4c0bb03c1653..2717e8eb3d9d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java @@ -33,6 +33,8 @@ public enum DatanodeVersion implements ComponentVersion { SEPARATE_RATIS_PORTS_AVAILABLE(1, "Version with separated Ratis port."), COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " + "a PutBlock request"), + STREAM_BLOCK_SUPPORT(3, + "This version has support for reading a block by streaming chunks."), FUTURE_VERSION(-1, "Used internally in the client when the server side is " + " newer and an unknown server version has arrived to the client."); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index c07a21680ef7..78ff1feabca0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -418,6 +418,7 @@ public static boolean isReadOnly( switch (proto.getCmdType()) { case ReadContainer: case ReadChunk: + case ReadBlock: case ListBlock: case GetBlock: case GetSmallFile: @@ -478,6 +479,7 @@ public static boolean requireBlockToken(Type cmdType) { case PutBlock: case PutSmallFile: case ReadChunk: + case ReadBlock: case WriteChunk: case FinalizeBlock: return true; @@ -553,6 +555,11 @@ public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) { blockID = msg.getReadChunk().getBlockID(); } break; + case ReadBlock: + if (msg.hasReadBlock()) { + blockID = msg.getReadBlock().getBlockID(); + } + break; case WriteChunk: if (msg.hasWriteChunk()) { blockID = msg.getWriteChunk().getBlockID(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java new file mode 100644 index 000000000000..ea8694cd8b78 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; + +/** + * Streaming read response holding datanode details and + * request observer to send read requests. + */ +public class StreamingReadResponse { + + private final DatanodeDetails dn; + private final ClientCallStreamObserver requestObserver; + + public StreamingReadResponse(DatanodeDetails dn, + ClientCallStreamObserver requestObserver) { + this.dn = dn; + this.requestObserver = requestObserver; + } + + public DatanodeDetails getDatanodeDetails() { + return dn; + } + + public ClientCallStreamObserver getRequestObserver() { + return requestObserver; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java new file mode 100644 index 000000000000..e0c45c956e8f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientResponseObserver; + +/** + * SPI for streaming reader to set the streaming read response. + */ +public interface StreamingReaderSpi extends ClientResponseObserver + { + + void setStreamingDatanode(DatanodeDetails datanodeDetails); + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 35e65271bb1c..e008765f7e7e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -144,6 +144,15 @@ public ContainerCommandResponseProto sendCommand( } } + public void streamRead(ContainerCommandRequestProto request, + StreamingReaderSpi streamObserver) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Stream read is not supported"); + } + + public void completeStreamRead() { + throw new UnsupportedOperationException("Stream read is not supported"); + } + public static IOException getIOExceptionForSendCommand( ContainerCommandRequestProto request, Exception e) { return new IOException("Failed to execute command " diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java index 5e547898662f..3fc97e97ffaa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBufferToByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; @@ -334,6 +335,20 @@ public static ContainerCommandResponseProto getReadChunkResponse( .build(); } + public static ContainerCommandResponseProto getReadBlockResponse( + ContainerCommandRequestProto request, ChecksumData checksumData, ByteBuffer data, long offset) { + + ContainerProtos.ReadBlockResponseProto response = ContainerProtos.ReadBlockResponseProto.newBuilder() + .setChecksumData(checksumData.getProtoBufMessage()) + .setData(ByteString.copyFrom(data)) + .setOffset(offset) + .build(); + + return getSuccessResponseBuilder(request) + .setReadBlock(response) + .build(); + } + public static ContainerCommandResponseProto getFinalizeBlockResponse( ContainerCommandRequestProto msg, BlockData data) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index a934fc513720..c6e5d75b5caf 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -55,12 +55,14 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.scm.StreamingReaderSpi; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; @@ -904,4 +906,47 @@ public static List toValidatorList(Validator validator) { return datanodeToResponseMap; } + /** + * Calls the container protocol to read a chunk. + * + * @param xceiverClient client to perform call + * @param offset offset where block starts + * @param blockID ID of the block + * @param token a token for this block (may be null) + * @throws IOException if there is an I/O error while performing the call + */ + @SuppressWarnings("checkstyle:ParameterNumber") + public static void readBlock( + XceiverClientSpi xceiverClient, long offset, BlockID blockID, Token token, + Map replicaIndexes, StreamingReaderSpi streamObserver) + throws IOException, InterruptedException { + final ReadBlockRequestProto.Builder readBlockRequest = + ReadBlockRequestProto.newBuilder() + .setOffset(offset); + final ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock) + .setContainerID(blockID.getContainerID()); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); + } + + readBlock(xceiverClient, blockID, builder, readBlockRequest, xceiverClient.getPipeline().getFirstNode(), + replicaIndexes, streamObserver); + } + + private static void readBlock(XceiverClientSpi xceiverClient, + BlockID blockID, ContainerCommandRequestProto.Builder builder, ReadBlockRequestProto.Builder readBlockBuilder, + DatanodeDetails datanode, Map replicaIndexes, + StreamingReaderSpi streamObserver) throws IOException, InterruptedException { + final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); + int replicaIndex = replicaIndexes.getOrDefault(datanode, 0); + if (replicaIndex > 0) { + datanodeBlockID.setReplicaIndex(replicaIndex); + } + readBlockBuilder.setBlockID(datanodeBlockID); + final ContainerCommandRequestProto request = builder + .setDatanodeUuid(datanode.getUuidString()) + .setReadBlock(readBlockBuilder).build(); + xceiverClient.streamRead(request, streamObserver); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index 091173356176..61d1c49da042 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -43,7 +43,8 @@ public enum DNAction implements AuditAction { STREAM_INIT, FINALIZE_BLOCK, ECHO, - GET_CONTAINER_CHECKSUM_INFO; + GET_CONTAINER_CHECKSUM_INFO, + READ_BLOCK; @Override public String getAction() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index ea47c4945b8c..6dba6abf9d09 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; +import io.opentelemetry.api.trace.Span; import java.io.File; import java.io.IOException; import java.util.Collections; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdds.security.token.NoopTokenVerifier; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; +import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.ozone.audit.AuditAction; import org.apache.hadoop.ozone.audit.AuditEventStatus; @@ -75,6 +77,8 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -812,6 +816,76 @@ public StateMachine.DataChannel getStreamDataChannel( } } + @Override + public void streamDataReadOnly(ContainerCommandRequestProto msg, + StreamObserver streamObserver, + DispatcherContext dispatcherContext) { + Type cmdType = msg.getCmdType(); + String traceID = msg.getTraceID(); + Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID); + AuditAction action = getAuditAction(msg.getCmdType()); + EventType eventType = getEventType(msg); + + try (UncheckedAutoCloseable ignored = protocolMetrics.measure(cmdType)) { + Preconditions.checkNotNull(msg); + if (LOG.isTraceEnabled()) { + LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(), traceID); + } + + PerformanceStringBuilder perf = new PerformanceStringBuilder(); + ContainerCommandResponseProto responseProto = null; + long containerID = msg.getContainerID(); + Container container = getContainer(containerID); + long startTime = Time.monotonicNow(); + + if (DispatcherContext.op(dispatcherContext).validateToken()) { + validateToken(msg); + } + if (getMissingContainerSet().contains(containerID)) { + throw new StorageContainerException( + "ContainerID " + containerID + + " has been lost and and cannot be recreated on this DataNode", + ContainerProtos.Result.CONTAINER_MISSING); + } + if (container == null) { + throw new StorageContainerException("ContainerID " + containerID + " does not exist", + ContainerProtos.Result.CONTAINER_NOT_FOUND); + } + ContainerType containerType = getContainerType(container); + Handler handler = getHandler(containerType); + if (handler == null) { + throw new StorageContainerException("Invalid " + "ContainerType " + containerType, + ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); + } + perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime); + responseProto = handler.readBlock(msg, container, dispatcherContext, streamObserver); + long oPLatencyMS = Time.monotonicNow() - startTime; + metrics.incContainerOpsLatencies(cmdType, oPLatencyMS); + if (responseProto == null) { + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.SUCCESS, null); + } else { + containerSet.scanContainer(containerID, "ReadBlock failed " + responseProto.getResult()); + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, + new Exception(responseProto.getMessage())); + streamObserver.onNext(responseProto); + } + perf.appendOpLatencyMs(oPLatencyMS); + performanceAudit(action, msg, dispatcherContext, perf, oPLatencyMS); + + } catch (StorageContainerException sce) { + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce); + streamObserver.onNext(ContainerUtils.logAndReturnError(LOG, sce, msg)); + } catch (IOException ioe) { + final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED + + " for " + dispatcherContext + ": " + ioe.getMessage(); + final StorageContainerException sce = new StorageContainerException( + s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED); + streamObserver.onNext(ContainerUtils.logAndReturnError(LOG, sce, msg)); + } finally { + span.end(); + } + } + private static DNAction getAuditAction(Type cmdType) { switch (cmdType) { case CreateContainer : return DNAction.CREATE_CONTAINER; @@ -836,6 +910,7 @@ private static DNAction getAuditAction(Type cmdType) { case FinalizeBlock : return DNAction.FINALIZE_BLOCK; case Echo : return DNAction.ECHO; case GetContainerChecksumInfo: return DNAction.GET_CONTAINER_CHECKSUM_INFO; + case ReadBlock : return DNAction.READ_BLOCK; default : LOG.debug("Invalid command type - {}", cmdType); return null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java index 1c3071a3791e..8a4a675187df 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; /** * Dispatcher acts as the bridge between the transport layer and @@ -89,4 +90,14 @@ default StateMachine.DataChannel getStreamDataChannel( throw new UnsupportedOperationException( "getStreamDataChannel not supported."); } + + /** + * When reading data form client by streaming chunks. + */ + default void streamDataReadOnly( + ContainerCommandRequestProto msg, + StreamObserver streamObserver, + DispatcherContext dispatcherContext) { + throw new UnsupportedOperationException("streamDataReadOnly not supported."); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 4337d667618f..0abcab5afea1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -40,9 +40,11 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; /** * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type @@ -264,4 +266,9 @@ public void setClusterID(String clusterID) { this.clusterId = clusterID; } + public abstract ContainerCommandResponseProto readBlock( + ContainerCommandRequestProto msg, Container container, + DispatcherContext dispatcherContext, + StreamObserver streamObserver); + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index 6728744d147a..e5499ed5d3a4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -90,6 +90,12 @@ private static void addZeroCopyMethod( newServiceBuilder.addMethod(newMethod, serverCallHandler); } + @Override + public void streamBlock(ContainerCommandRequestProto request, + StreamObserver responseObserver) { + dispatcher.streamDataReadOnly(request, responseObserver, null); + } + @Override public StreamObserver send( StreamObserver responseObserver) { @@ -104,8 +110,12 @@ public void onNext(ContainerCommandRequestProto request) { .build(); try { - final ContainerCommandResponseProto resp = dispatcher.dispatch(request, context); - responseObserver.onNext(resp); + if (request.getCmdType() == Type.ReadBlock) { + dispatcher.streamDataReadOnly(request, responseObserver, null); + } else { + final ContainerCommandResponseProto resp = dispatcher.dispatch(request, context); + responseObserver.onNext(resp); + } } catch (Throwable e) { LOG.error("Got exception when processing" + " ContainerCommandRequestProto {}", request, e); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java index f9ee0a4bd0f5..e0f0ccbe193a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java @@ -34,6 +34,8 @@ public final class DispatcherContext { private static final DispatcherContext HANDLE_READ_CHUNK = newBuilder(Op.HANDLE_READ_CHUNK).build(); + private static final DispatcherContext HANDLE_READ_BLOCK + = newBuilder(Op.HANDLE_READ_BLOCK).build(); private static final DispatcherContext HANDLE_WRITE_CHUNK = newBuilder(Op.HANDLE_WRITE_CHUNK).build(); private static final DispatcherContext HANDLE_GET_SMALL_FILE @@ -60,6 +62,10 @@ public static DispatcherContext getHandleReadChunk() { return HANDLE_READ_CHUNK; } + public static DispatcherContext getHandleReadBlock() { + return HANDLE_READ_BLOCK; + } + public static DispatcherContext getHandleWriteChunk() { return HANDLE_WRITE_CHUNK; } @@ -92,6 +98,7 @@ public enum Op { NULL, HANDLE_READ_CHUNK, + HANDLE_READ_BLOCK, HANDLE_WRITE_CHUNK, HANDLE_GET_SMALL_FILE, HANDLE_PUT_SMALL_FILE, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 584cb98b367d..fbf0d6aa8f84 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -46,6 +46,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse; @@ -58,6 +59,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST; import static org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline; import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -68,7 +70,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; @@ -76,6 +80,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -102,6 +107,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -121,6 +127,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.ChunkBufferToByteString; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -162,6 +169,8 @@ import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.grpc.stub.ServerCallStreamObserver; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,6 +181,7 @@ public class KeyValueHandler extends Handler { private static final Logger LOG = LoggerFactory.getLogger( KeyValueHandler.class); + private static final int STREAMING_BYTES_PER_CHUNK = 1024 * 64; private final BlockManager blockManager; private final ChunkManager chunkManager; @@ -2046,6 +2056,125 @@ public void deleteUnreferenced(Container container, long localID) } } + @Override + public ContainerCommandResponseProto readBlock( + ContainerCommandRequestProto request, Container kvContainer, + DispatcherContext dispatcherContext, + StreamObserver streamObserver) { + + if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException("Only File Per Block is supported", IO_EXCEPTION), request); + } + + ContainerCommandResponseProto responseProto = null; + if (!request.hasReadBlock()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Malformed Read Block request. trace ID: {}", request.getTraceID()); + } + return malformedRequest(request); + } + try { + ReadBlockRequestProto readBlock = request.getReadBlock(); + + BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID()); + // This is a new api the block should always be checked. + BlockUtils.verifyReplicaIdx(kvContainer, blockID); + BlockUtils.verifyBCSId(kvContainer, blockID); + + File blockFile = FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused"); + + BlockData blockData = getBlockManager().getBlock(kvContainer, blockID); + List chunkInfos = blockData.getChunks(); + // To get the chunksize, check the first chunk. Either there is only 1 chunk and its the largest, or there are + // multiple chunks and they are all the same size except the last one. + long bytesPerChunk = chunkInfos.get(0).getLen(); + // The bytes per checksum is stored in the checksum data of each chunk, so check the first chunk as they all + // must be the same. + ContainerProtos.ChecksumType checksumType = chunkInfos.get(0).getChecksumData().getType(); + ChecksumData noChecksumChecksumData; + int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK; + if (checksumType == ContainerProtos.ChecksumType.NONE) { + noChecksumChecksumData = new ChecksumData(checksumType, 0); + } else { + noChecksumChecksumData = null; + bytesPerChecksum = chunkInfos.get(0).getChecksumData().getBytesPerChecksum(); + } + final int finalBytesPerChecksum = bytesPerChecksum; + // We have to align the read to checksum boundaries, so whatever offset is requested, we have to move back to the + // previous checksum boundary. + // eg if bytesPerChecksum is 512, and the requested offset is 600, we have to move back to 512. + // If the checksum type is NONE, we don't have to do this, but using no checksums should be rare in practice and + // it simplifies the code to always do this. + + ServerCallStreamObserver serverCallStreamObserver = + ((ServerCallStreamObserver) streamObserver); + Runnable dataStream = new Runnable() { + + private final RandomAccessFile file = new RandomAccessFile(blockFile, "r"); + private final FileChannel channel = file.getChannel(); + private final ByteBuffer buffer = ByteBuffer.allocate(finalBytesPerChecksum); + private long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() % finalBytesPerChecksum; + + @Override + public void run() { + try { + channel.position(adjustedOffset); + while (serverCallStreamObserver.isReady() && !serverCallStreamObserver.isCancelled()) { + int read = channel.read(buffer); + if (read == -1) { + serverCallStreamObserver.onCompleted(); + channel.close(); + file.close(); + break; + } + buffer.flip(); + ChecksumData checksumData; + if (checksumType != ContainerProtos.ChecksumType.NONE) { + // As the checksums are stored "chunk by chunk", we need to figure out which chunk we start reading from + // and its offset to pull out the correct checksum bytes for each read. + int chunkIndex = (int) (adjustedOffset / bytesPerChunk); + int chunkOffset = (int) (adjustedOffset % bytesPerChunk); + int checksumIndex = chunkOffset / finalBytesPerChecksum; + ByteString checksum = + blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex); + checksumData = + new ChecksumData(checksumType, finalBytesPerChecksum, Collections.singletonList(checksum)); + } else { + checksumData = noChecksumChecksumData; + } + serverCallStreamObserver.onNext(getReadBlockResponse(request, checksumData, buffer, adjustedOffset)); + adjustedOffset += finalBytesPerChecksum; + buffer.clear(); + } + if (serverCallStreamObserver.isCancelled()) { + channel.close(); + file.close(); + } + } catch (IOException e) { + LOG.error("Error while reading block file {}", blockFile, e); + serverCallStreamObserver.onError(e); + try { + channel.close(); + file.close(); + } catch (IOException ioException) { + LOG.error("Error while closing file/channel for block file {}", blockFile, ioException); + } + } + } + }; + serverCallStreamObserver.setOnReadyHandler(dataStream); + dataStream.run(); + // TODO metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen()); + } catch (StorageContainerException ex) { + responseProto = ContainerUtils.logAndReturnError(LOG, ex, request); + } catch (IOException ioe) { + responseProto = ContainerUtils.logAndReturnError(LOG, + new StorageContainerException("Read Block failed", ioe, IO_EXCEPTION), request); + } + return responseProto; + } + @Override public void addFinalizedBlock(Container container, long localID) { KeyValueContainer keyValueContainer = (KeyValueContainer)container; @@ -2084,7 +2213,7 @@ private boolean logBlocksIfNonZero(Container container) } if (nonZero) { LOG.error("blocks in rocksDB on container delete: {}", - stringBuilder.toString()); + stringBuilder); } } return nonZero; diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index bd890eae64a8..8ef8705c713e 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -77,6 +77,8 @@ package hadoop.hdds.datanode; * 18. CopyContainer - Copies a container from a remote machine. * * 19. FinalizeBlock - Finalize block request from client. + * + * 20. ReadBlock - Allows us to read a block. */ enum Type { @@ -108,6 +110,7 @@ enum Type { FinalizeBlock = 21; Echo = 22; GetContainerChecksumInfo = 23; + ReadBlock = 24; } @@ -218,6 +221,7 @@ message ContainerCommandRequestProto { optional FinalizeBlockRequestProto finalizeBlock = 25; optional EchoRequestProto echo = 26; optional GetContainerChecksumInfoRequestProto getContainerChecksumInfo = 27; + optional ReadBlockRequestProto readBlock = 28; } message ContainerCommandResponseProto { @@ -250,6 +254,7 @@ message ContainerCommandResponseProto { optional FinalizeBlockResponseProto finalizeBlock = 22; optional EchoResponseProto echo = 23; optional GetContainerChecksumInfoResponseProto getContainerChecksumInfo = 24; + optional ReadBlockResponseProto readBlock = 25; } message ContainerDataProto { @@ -393,6 +398,17 @@ message ListBlockResponseProto { repeated BlockData blockData = 1; } +message ReadBlockRequestProto { + required DatanodeBlockID blockID = 1; + required uint64 offset = 2; +} + +message ReadBlockResponseProto { + required ChecksumData checksumData = 1; + required uint64 offset = 2; + required bytes data = 3; +} + message EchoRequestProto { optional bytes payload = 1; optional int32 payloadSizeResp = 2; @@ -584,7 +600,7 @@ service XceiverClientProtocolService { // A client-to-datanode RPC to send container commands rpc send(stream ContainerCommandRequestProto) returns (stream ContainerCommandResponseProto) {}; - + rpc streamBlock(ContainerCommandRequestProto) returns (stream ContainerCommandResponseProto); } service IntraDatanodeProtocolService { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java new file mode 100644 index 000000000000..bb66a303155e --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client.rpc.read; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyInputStream; +import org.apache.hadoop.ozone.om.TestBucket; +import org.junit.jupiter.api.Test; + +/** + * Tests {@link StreamBlockInputStream}. + */ +public class TestStreamBlockInputStream extends TestInputStreamBase { + /** + * Run the tests as a single test method to avoid needing a new mini-cluster + * for each test. + */ + private static final int DATA_LENGTH = (2 * BLOCK_SIZE) + (CHUNK_SIZE); + private byte[] inputData; + private TestBucket bucket; + + @Test + void testAll() throws Exception { + try (MiniOzoneCluster cluster = newCluster()) { + cluster.waitForClusterToBeReady(); + + OzoneConfiguration conf = cluster.getConf(); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamReadBlock(true); + OzoneConfiguration copy = new OzoneConfiguration(conf); + copy.setFromObject(clientConfig); + String keyName = getNewKeyName(); + try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) { + bucket = TestBucket.newBuilder(client).build(); + inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH); + testReadKeyFully(keyName); + testSeek(keyName); + testReadEmptyBlock(); + } + keyName = getNewKeyName(); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); + copy.setFromObject(clientConfig); + try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) { + bucket = TestBucket.newBuilder(client).build(); + inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH); + testReadKeyFully(keyName); + testSeek(keyName); + } + } + } + + /** + * Test to verify that data read from blocks is stored in a list of buffers + * with max capacity equal to the bytes per checksum. + */ + private void testReadKeyFully(String key) throws Exception { + // Read the data fully into a large enough byte array + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + byte[] readData = new byte[DATA_LENGTH]; + int totalRead = keyInputStream.read(readData, 0, DATA_LENGTH); + assertEquals(DATA_LENGTH, totalRead); + for (int i = 0; i < DATA_LENGTH; i++) { + assertEquals(inputData[i], readData[i], + "Read data is not same as written data at index " + i); + } + } + // Read the data 1 byte at a time + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + for (int i = 0; i < DATA_LENGTH; i++) { + int b = keyInputStream.read(); + assertEquals(inputData[i], (byte) b, + "Read data is not same as written data at index " + i); + } + } + // Read the data into a large enough ByteBuffer + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + ByteBuffer readBuf = ByteBuffer.allocate(DATA_LENGTH); + int totalRead = keyInputStream.read(readBuf); + assertEquals(DATA_LENGTH, totalRead); + readBuf.flip(); + for (int i = 0; i < DATA_LENGTH; i++) { + assertEquals(inputData[i], readBuf.get(), + "Read data is not same as written data at index " + i); + } + } + } + + private void testSeek(String key) throws IOException { + java.util.Random random = new java.util.Random(); + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) { + for (int i = 0; i < 100; i++) { + int position = random.nextInt(DATA_LENGTH); + keyInputStream.seek(position); + int b = keyInputStream.read(); + assertEquals(inputData[position], (byte) b, "Read data is not same as written data at index " + position); + } + StreamBlockInputStream blockStream = (StreamBlockInputStream) keyInputStream.getPartStreams().get(0); + long length = blockStream.getLength(); + blockStream.seek(10); + long position = blockStream.getPos(); + assertThrows(IOException.class, () -> blockStream.seek(length + 1), + "Seek beyond block length should throw exception"); + assertThrows(IOException.class, () -> blockStream.seek(-1), + "Seeking to a negative position should throw exception"); + assertEquals(position, blockStream.getPos(), + "Position should not change after failed seek attempts"); + } + } + + private void testReadEmptyBlock() throws Exception { + String keyName = getNewKeyName(); + bucket.writeRandomBytes(keyName, 0); + try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) { + assertTrue(keyInputStream.getPartStreams().isEmpty()); + assertEquals(-1, keyInputStream.read()); + } + } +}